如何加速 spark df.write jdbc 到 postgres 数据库?
How to speed up spark df.write jdbc to postgres database?
我是 spark 的新手,我正在尝试使用 df.write:
加快将数据框的内容(可能有 200k 到 2M 行)附加到 postgres 数据库
df.write.format('jdbc').options(
url=psql_url_spark,
driver=spark_env['PSQL_DRIVER'],
dbtable="{schema}.{table}".format(schema=schema, table=table),
user=spark_env['PSQL_USER'],
password=spark_env['PSQL_PASS'],
batchsize=2000000,
queryTimeout=690
).mode(mode).save()
我尝试增加批处理大小但没有帮助,因为完成此任务仍然需要大约 4 小时。我还在下面包含了一些来自 aws emr 的快照,显示了有关作业 运行 的更多详细信息。将数据框保存到 postgres table 的任务只分配给了一个执行者(我发现它是 st运行ge),加速这个任务是否涉及在执行者之间划分这个任务?
此外,我已阅读 spark's performance tuning docs,但增加 batchsize
和 queryTimeout
似乎并未提高性能。 (我尝试在 df.write
之前在我的脚本中调用 df.cache()
,但脚本的运行时间仍然是 4 小时)
此外,我的 aws emr 硬件设置和 spark-submit
是:
主节点(1):m4.xlarge
核心节点(2):m5.xlarge
spark-submit --deploy-mode client --executor-cores 4 --num-executors 4 ...
要解决性能问题,一般需要解决以下2个瓶颈:
- 确保 spark 作业正在将数据并行写入数据库 -
要解决此问题,请确保您有一个分区数据框。使用“
df.repartition(n)
”对数据帧进行分区,以便每个分区都并行写入数据库。
注意 - 大量的执行者也会导致插入速度变慢。所以从 5 个分区开始,然后将分区数增加 5,直到获得最佳性能。
- 确保数据库具有摄取大量数据所需的足够计算、内存和存储空间。
Spark 是一个分布式数据处理引擎,因此当您处理数据或将其保存在文件系统上时,它会使用其所有执行程序来执行任务。
Spark JDBC 很慢,因为当您建立 JDBC 连接时,其中一个执行程序会建立 link 到目标数据库,因此导致速度慢和失败。
要克服此问题并加快数据写入数据库的速度,您需要使用以下方法之一:
方法一:
在这种方法中,您需要使用 postgres COPY 命令实用程序 以加快写入操作。这要求您的 EMR 集群上有 psycopg2 库。
COPY 实用程序的文档是 here
如果您想了解基准差异以及复制速度更快的原因,请访问 !
Postgres 还建议使用 COPY 命令进行批量插入。现在如何批量插入 spark 数据框。
现在要实现更快的写入,首先将您的 spark 数据帧以 csv 格式保存到 EMR 文件系统,并重新分区您的输出,以便没有文件包含超过 100k 行。
#Repartition your dataframe dynamically based on number of rows in df
df.repartition(10).write.option("maxRecordsPerFile", 100000).mode("overwrite").csv("path/to/save/data)
现在使用python读取文件并对每个文件执行复制命令。
import psycopg2
#iterate over your files here and generate file object you can also get files list using os module
file = open('path/to/save/data/part-00000_0.csv')
file1 = open('path/to/save/data/part-00000_1.csv')
#define a function
def execute_copy(fileName):
con = psycopg2.connect(database=dbname,user=user,password=password,host=host,port=port)
cursor = con.cursor()
cursor.copy_from(fileName, 'table_name', sep=",")
con.commit()
con.close()
为了获得额外的速度提升,因为您使用的是 EMR 集群,您可以利用 python 多处理一次复制多个文件。
from multiprocessing import Pool, cpu_count
with Pool(cpu_count()) as p:
print(p.map(execute_copy, [file,file1]))
这是推荐的方法,因为由于连接限制,无法调整 spark JDBC 以获得更高的写入速度。
方法二:
由于您已经在使用 AWS EMR 集群,因此您始终可以利用 hadoop 功能来更快地执行 table 写入。
所以在这里我们将使用 sqoop export 将我们的数据从 emrfs 导出到 postgres 数据库。
#If you are using s3 as your source path
sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir s3://mybucket/myinputfiles/ --driver org.postgresql.Driver --username master --password password --input-null-string '\N' --input-null-non-string '\N' --direct -m 16
#If you are using EMRFS as your source path
sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir /path/to/save/data/ --driver org.postgresql.Driver --username master --password password --input-null-string '\N' --input-null-non-string '\N' --direct -m 16
为什么选择 sqoop?
因为sqoop是根据指定的mapper个数与数据库开启多个连接。因此,如果您将 -m 指定为 8,那么将有 8 个并发连接流,并且这些连接流会将数据写入 postgres。
此外,有关使用 sqoop 的更多信息,请阅读此 AWS Blog, SQOOP Considerations and SQOOP Documentation。
如果您可以使用代码破解您的方法,那么方法 1 肯定会给您带来您所寻求的性能提升,如果您对 table 像 SQOOP 这样的 hadoop 组件感到满意,那么请使用第二种方法。
希望对您有所帮助!
Spark 端调整 => 在 Datafarme 上执行 repartition
以便有多个执行程序并行写入数据库
df
.repartition(10) // No. of concurrent connection Spark to PostgreSQL
.write.format('jdbc').options(
url=psql_url_spark,
driver=spark_env['PSQL_DRIVER'],
dbtable="{schema}.{table}".format(schema=schema, table=table),
user=spark_env['PSQL_USER'],
password=spark_env['PSQL_PASS'],
batchsize=2000000,
queryTimeout=690
).mode(mode).save()
Postgresql 端调优=>
在 PostgreSQL 上需要分别提高以下参数。
max_connections
决定最大并发数
与数据库服务器的连接。默认值通常为 100
连接。
shared_buffers
配置参数决定多少
内存专供 PostgreSQL 用于缓存数据。
通过重新分区数据框,您可以获得更好的写入性能,这是一个已知的答案。但是有一种重新分区数据框的最佳方法。
由于您是 运行 EMR 集群上的此过程,因此首先了解每个从属实例上 运行 的实例类型和核心数。根据该指定数据帧上的分区数。
在您的情况下,您使用的是 m5.xlarge(2 slaves),每个有 4 个 vCPU,这意味着每个实例有 4 个线程。因此,当您处理大量数据时,8 个分区将为您提供最佳结果。
注意:应根据您的数据大小增加或减少分区数。
注意:批量大小也是您在写入时应考虑的因素。批量越大性能越好
我是 spark 的新手,我正在尝试使用 df.write:
加快将数据框的内容(可能有 200k 到 2M 行)附加到 postgres 数据库df.write.format('jdbc').options(
url=psql_url_spark,
driver=spark_env['PSQL_DRIVER'],
dbtable="{schema}.{table}".format(schema=schema, table=table),
user=spark_env['PSQL_USER'],
password=spark_env['PSQL_PASS'],
batchsize=2000000,
queryTimeout=690
).mode(mode).save()
我尝试增加批处理大小但没有帮助,因为完成此任务仍然需要大约 4 小时。我还在下面包含了一些来自 aws emr 的快照,显示了有关作业 运行 的更多详细信息。将数据框保存到 postgres table 的任务只分配给了一个执行者(我发现它是 st运行ge),加速这个任务是否涉及在执行者之间划分这个任务?
此外,我已阅读 spark's performance tuning docs,但增加 batchsize
和 queryTimeout
似乎并未提高性能。 (我尝试在 df.write
之前在我的脚本中调用 df.cache()
,但脚本的运行时间仍然是 4 小时)
此外,我的 aws emr 硬件设置和 spark-submit
是:
主节点(1):m4.xlarge
核心节点(2):m5.xlarge
spark-submit --deploy-mode client --executor-cores 4 --num-executors 4 ...
要解决性能问题,一般需要解决以下2个瓶颈:
- 确保 spark 作业正在将数据并行写入数据库 -
要解决此问题,请确保您有一个分区数据框。使用“
df.repartition(n)
”对数据帧进行分区,以便每个分区都并行写入数据库。 注意 - 大量的执行者也会导致插入速度变慢。所以从 5 个分区开始,然后将分区数增加 5,直到获得最佳性能。 - 确保数据库具有摄取大量数据所需的足够计算、内存和存储空间。
Spark 是一个分布式数据处理引擎,因此当您处理数据或将其保存在文件系统上时,它会使用其所有执行程序来执行任务。 Spark JDBC 很慢,因为当您建立 JDBC 连接时,其中一个执行程序会建立 link 到目标数据库,因此导致速度慢和失败。
要克服此问题并加快数据写入数据库的速度,您需要使用以下方法之一:
方法一:
在这种方法中,您需要使用 postgres COPY 命令实用程序 以加快写入操作。这要求您的 EMR 集群上有 psycopg2 库。
COPY 实用程序的文档是 here
如果您想了解基准差异以及复制速度更快的原因,请访问
Postgres 还建议使用 COPY 命令进行批量插入。现在如何批量插入 spark 数据框。 现在要实现更快的写入,首先将您的 spark 数据帧以 csv 格式保存到 EMR 文件系统,并重新分区您的输出,以便没有文件包含超过 100k 行。
#Repartition your dataframe dynamically based on number of rows in df
df.repartition(10).write.option("maxRecordsPerFile", 100000).mode("overwrite").csv("path/to/save/data)
现在使用python读取文件并对每个文件执行复制命令。
import psycopg2
#iterate over your files here and generate file object you can also get files list using os module
file = open('path/to/save/data/part-00000_0.csv')
file1 = open('path/to/save/data/part-00000_1.csv')
#define a function
def execute_copy(fileName):
con = psycopg2.connect(database=dbname,user=user,password=password,host=host,port=port)
cursor = con.cursor()
cursor.copy_from(fileName, 'table_name', sep=",")
con.commit()
con.close()
为了获得额外的速度提升,因为您使用的是 EMR 集群,您可以利用 python 多处理一次复制多个文件。
from multiprocessing import Pool, cpu_count
with Pool(cpu_count()) as p:
print(p.map(execute_copy, [file,file1]))
这是推荐的方法,因为由于连接限制,无法调整 spark JDBC 以获得更高的写入速度。
方法二: 由于您已经在使用 AWS EMR 集群,因此您始终可以利用 hadoop 功能来更快地执行 table 写入。 所以在这里我们将使用 sqoop export 将我们的数据从 emrfs 导出到 postgres 数据库。
#If you are using s3 as your source path
sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir s3://mybucket/myinputfiles/ --driver org.postgresql.Driver --username master --password password --input-null-string '\N' --input-null-non-string '\N' --direct -m 16
#If you are using EMRFS as your source path
sqoop export --connect jdbc:postgresql:hostname:port/postgresDB --table target_table --export-dir /path/to/save/data/ --driver org.postgresql.Driver --username master --password password --input-null-string '\N' --input-null-non-string '\N' --direct -m 16
为什么选择 sqoop? 因为sqoop是根据指定的mapper个数与数据库开启多个连接。因此,如果您将 -m 指定为 8,那么将有 8 个并发连接流,并且这些连接流会将数据写入 postgres。
此外,有关使用 sqoop 的更多信息,请阅读此 AWS Blog, SQOOP Considerations and SQOOP Documentation。
如果您可以使用代码破解您的方法,那么方法 1 肯定会给您带来您所寻求的性能提升,如果您对 table 像 SQOOP 这样的 hadoop 组件感到满意,那么请使用第二种方法。
希望对您有所帮助!
Spark 端调整 => 在 Datafarme 上执行 repartition
以便有多个执行程序并行写入数据库
df
.repartition(10) // No. of concurrent connection Spark to PostgreSQL
.write.format('jdbc').options(
url=psql_url_spark,
driver=spark_env['PSQL_DRIVER'],
dbtable="{schema}.{table}".format(schema=schema, table=table),
user=spark_env['PSQL_USER'],
password=spark_env['PSQL_PASS'],
batchsize=2000000,
queryTimeout=690
).mode(mode).save()
Postgresql 端调优=> 在 PostgreSQL 上需要分别提高以下参数。
max_connections
决定最大并发数 与数据库服务器的连接。默认值通常为 100 连接。shared_buffers
配置参数决定多少 内存专供 PostgreSQL 用于缓存数据。
通过重新分区数据框,您可以获得更好的写入性能,这是一个已知的答案。但是有一种重新分区数据框的最佳方法。 由于您是 运行 EMR 集群上的此过程,因此首先了解每个从属实例上 运行 的实例类型和核心数。根据该指定数据帧上的分区数。 在您的情况下,您使用的是 m5.xlarge(2 slaves),每个有 4 个 vCPU,这意味着每个实例有 4 个线程。因此,当您处理大量数据时,8 个分区将为您提供最佳结果。
注意:应根据您的数据大小增加或减少分区数。
注意:批量大小也是您在写入时应考虑的因素。批量越大性能越好