在 PySpark SQL 中并行执行读取和写入 API 调用

Parallel execution of read and write API calls in PySpark SQL

我需要将 MySQL 中一组 table 的增量记录以 Parquet 格式加载到 Amazon S3。这些 table 在 AWS MySQL 托管实例中的几个 databases/schemas 中很常见。代码应该从每个模式(有一组公共的 tables)并行复制数据。

我正在使用读取 API PySpark SQL 连接到 MySQL 实例并读取每个 table 的数据以获取模式并将结果数据帧写入S3 使用 write API 作为 Parquet 文件。我 运行 在数据库中的每个 table 循环中,如下面的代码所示:

def load_data_to_s3(databases_df):
    db_query_properties = config['mysql-query']
    auto_id_values = config['mysql-auto-id-values']
    for row in databases_df.collect():
        for table in db_query_properties.keys():
            last_recorded_id_value = auto_id_values[table]
            select_sql = "select * from {}.{} where id>{}".format(row.database_name, table, last_recorded_id_value)
            df = spark.read.format("jdbc") \
                    .option("driver", mysql_db_properties['driver']) \
                    .option("url", row.database_connection_url) \
                    .option("dbtable", select_sql) \
                    .option("user", username) \
                    .option("password", password) \
                    .load()
            s3_path = 's3a://{}/{}/{}'.format(s3_bucket, database_dir, table)
            df.write.parquet(s3_path, mode="append") 

我想知道如何在 EMR 集群中将此代码并行扩展到多个数据库 运行。请给我建议 suitable 方法。如果需要更多详细信息,请告诉我。

您的 list_of_databases 未并行化。要进行并行处理,您应该并行化列表并使用 foreach 或 spark.

提供的内容来执行并行作业

在 EMR 中打开并发选项并为每个 table 发送 EMR 步骤,或者您可以使用 Spark 的公平调度程序,它可以在内部并行处理作业,只需对代码进行少量修改。

我可以提出两个解决方案:

1.简单的方法

一次向您的 EMR 提交多个作业(每个数据库一个作业)。如果监控有问题,只需将失败日志写入 S3 或 HDFS。

2。需要更改一些代码

您可以尝试使用线程来并行化从每个数据库中提取的数据。我可以展示如何操作的示例,但您可能需要进行更多更改以适合您的用例。

示例实现:

import threading

def load_data_to_s3(databases_df):
    db_query_properties = config['mysql-query']
    auto_id_values = config['mysql-auto-id-values']
    for row in databases_df.collect():
        for table in db_query_properties.keys():
            last_recorded_id_value = auto_id_values[table]
            select_sql = "select * from {}.{} where id>{}".format(row.database_name, table, last_recorded_id_value)
            df = spark.read.format("jdbc") \
                    .option("driver", mysql_db_properties['driver']) \
                    .option("url", row.database_connection_url) \
                    .option("dbtable", select_sql) \
                    .option("user", username) \
                    .option("password", password) \
                    .load()
            s3_path = 's3a://{}/{}/{}'.format(s3_bucket, database_dir, table)
            df.write.parquet(s3_path, mode="append") 

threads = [threading.Thread(target=load_data_to_s3, args=(db) for db in databases_df]
    
for t in threads:
    t.start()

for t in threads:
    t.join() 

此外,请确保使用 set('spark.scheduler.mode', 'FAIR') 属性 将调度程序更改为 FAIR。这将为您的每个数据库创建一个线程。如果要并行控制线程数运行,相应修改for循环。

此外,如果您想从程序中创建新作业,请将您的 SparkSession 与参数一起传递。