将数据并行写入 parquet 格式
Write data to parquet format in parallel
我有一个相对庞大的内部部署 table(约 15 亿行),我正尝试使用 AWS Glue 以镶木地板格式将其拉入 AWS S3。我正在使用 spark JDBC 读取 table 并将其写入 S3。问题是我无法一次性从源 table 中提取所有数据,因为源数据库会 运行 内存不足并抱怨。为了解决这个问题,我使用谓词选项并行下推过滤器,这可以很好地提取 2 亿左右的数据块。但是当我尝试将此数据帧写入 S3 时,它需要将近半小时才能完成:
df = spark.read.jdbc(url=host_url,
table="TABLENAME",
predicates=predicates,
properties= {
"user" : username,
"password" : password
}
)
所以我想做的是按顺序从数据库阶段读取:
Read Part 1 from DB --> Read Part 2 from DB --> Read Part 3 from DB
然后将所有数据并行写入S3
Write Part 1 || Write Part 2 || Write Part 3
我有两个问题:
- 我不知道 Spark 何时真正将这些查询发送到数据库。我知道当我定义数据帧时不是如上所示,所以我无法弄清楚如何序列化阶段 1。
- 我环顾四周,找不到将多个数据帧并行写入 parquet 分区的选项。我应该只使用 python 将数据帧并行化为 parquet 写入操作语句吗?这样做是否明智?
Spark 在应用操作后立即读取数据,因为您只是在读取和写入 s3,所以在触发写入时读取数据。
Spark 未针对从 rdbms 读取批量数据进行优化,因为它只建立与数据库的单一连接。
如果您想坚持阅读的火花,请尝试将 fetchsize 属性 增加到 100000,默认值为 1000。
对于数据的并行处理,您可以尝试利用 python 多处理并执行并行读取和写入
Thread 1
Read 1 -> Write 1
Thread 2
Read 2 -> Write 2
但第一次尝试执行只是顺序
Read 1 -> Write 1 -> Read 2 -> Write 2
我建议的另一种方法是使用 DMS 或 SCT 将所有数据一次传输到 s3。
DMS 可以在 s3 中以 parquet 格式转储数据,并且速度非常快,因为它针对迁移任务本身进行了优化。
如果您不想使用 DMS,您可以编写一个 sqoop 导入作业,该作业可以通过瞬态 EMR 集群触发。
Sqoop 还可以导入 parquet 格式的数据。
Glue 最适合转换现有数据和迁移大数据,您应该借助其他服务。
我有一个相对庞大的内部部署 table(约 15 亿行),我正尝试使用 AWS Glue 以镶木地板格式将其拉入 AWS S3。我正在使用 spark JDBC 读取 table 并将其写入 S3。问题是我无法一次性从源 table 中提取所有数据,因为源数据库会 运行 内存不足并抱怨。为了解决这个问题,我使用谓词选项并行下推过滤器,这可以很好地提取 2 亿左右的数据块。但是当我尝试将此数据帧写入 S3 时,它需要将近半小时才能完成:
df = spark.read.jdbc(url=host_url,
table="TABLENAME",
predicates=predicates,
properties= {
"user" : username,
"password" : password
}
)
所以我想做的是按顺序从数据库阶段读取:
Read Part 1 from DB --> Read Part 2 from DB --> Read Part 3 from DB
然后将所有数据并行写入S3
Write Part 1 || Write Part 2 || Write Part 3
我有两个问题:
- 我不知道 Spark 何时真正将这些查询发送到数据库。我知道当我定义数据帧时不是如上所示,所以我无法弄清楚如何序列化阶段 1。
- 我环顾四周,找不到将多个数据帧并行写入 parquet 分区的选项。我应该只使用 python 将数据帧并行化为 parquet 写入操作语句吗?这样做是否明智?
Spark 在应用操作后立即读取数据,因为您只是在读取和写入 s3,所以在触发写入时读取数据。
Spark 未针对从 rdbms 读取批量数据进行优化,因为它只建立与数据库的单一连接。 如果您想坚持阅读的火花,请尝试将 fetchsize 属性 增加到 100000,默认值为 1000。
对于数据的并行处理,您可以尝试利用 python 多处理并执行并行读取和写入
Thread 1
Read 1 -> Write 1
Thread 2
Read 2 -> Write 2
但第一次尝试执行只是顺序
Read 1 -> Write 1 -> Read 2 -> Write 2
我建议的另一种方法是使用 DMS 或 SCT 将所有数据一次传输到 s3。
DMS 可以在 s3 中以 parquet 格式转储数据,并且速度非常快,因为它针对迁移任务本身进行了优化。
如果您不想使用 DMS,您可以编写一个 sqoop 导入作业,该作业可以通过瞬态 EMR 集群触发。 Sqoop 还可以导入 parquet 格式的数据。
Glue 最适合转换现有数据和迁移大数据,您应该借助其他服务。