在 PySpark SQL 中并行执行读取和写入 API 调用
Parallel execution of read and write API calls in PySpark SQL
我需要将 MySQL 中一组 table 的增量记录以 Parquet 格式加载到 Amazon S3。这些 table 在 AWS MySQL 托管实例中的几个 databases/schemas 中很常见。代码应该从每个模式(有一组公共的 tables)并行复制数据。
我正在使用读取 API PySpark SQL 连接到 MySQL 实例并读取每个 table 的数据以获取模式并将结果数据帧写入S3 使用 write API 作为 Parquet 文件。我 运行 在数据库中的每个 table 循环中,如下面的代码所示:
def load_data_to_s3(databases_df):
db_query_properties = config['mysql-query']
auto_id_values = config['mysql-auto-id-values']
for row in databases_df.collect():
for table in db_query_properties.keys():
last_recorded_id_value = auto_id_values[table]
select_sql = "select * from {}.{} where id>{}".format(row.database_name, table, last_recorded_id_value)
df = spark.read.format("jdbc") \
.option("driver", mysql_db_properties['driver']) \
.option("url", row.database_connection_url) \
.option("dbtable", select_sql) \
.option("user", username) \
.option("password", password) \
.load()
s3_path = 's3a://{}/{}/{}'.format(s3_bucket, database_dir, table)
df.write.parquet(s3_path, mode="append")
我想知道如何在 EMR 集群中将此代码并行扩展到多个数据库 运行。请给我建议 suitable 方法。如果需要更多详细信息,请告诉我。
您的 list_of_databases
未并行化。要进行并行处理,您应该并行化列表并使用 foreach
或 spark.
提供的内容来执行并行作业
在 EMR 中打开并发选项并为每个 table 发送 EMR 步骤,或者您可以使用 Spark 的公平调度程序,它可以在内部并行处理作业,只需对代码进行少量修改。
我可以提出两个解决方案:
1.简单的方法
一次向您的 EMR 提交多个作业(每个数据库一个作业)。如果监控有问题,只需将失败日志写入 S3 或 HDFS。
2。需要更改一些代码
您可以尝试使用线程来并行化从每个数据库中提取的数据。我可以展示如何操作的示例,但您可能需要进行更多更改以适合您的用例。
示例实现:
import threading
def load_data_to_s3(databases_df):
db_query_properties = config['mysql-query']
auto_id_values = config['mysql-auto-id-values']
for row in databases_df.collect():
for table in db_query_properties.keys():
last_recorded_id_value = auto_id_values[table]
select_sql = "select * from {}.{} where id>{}".format(row.database_name, table, last_recorded_id_value)
df = spark.read.format("jdbc") \
.option("driver", mysql_db_properties['driver']) \
.option("url", row.database_connection_url) \
.option("dbtable", select_sql) \
.option("user", username) \
.option("password", password) \
.load()
s3_path = 's3a://{}/{}/{}'.format(s3_bucket, database_dir, table)
df.write.parquet(s3_path, mode="append")
threads = [threading.Thread(target=load_data_to_s3, args=(db) for db in databases_df]
for t in threads:
t.start()
for t in threads:
t.join()
此外,请确保使用 set('spark.scheduler.mode', 'FAIR')
属性 将调度程序更改为 FAIR。这将为您的每个数据库创建一个线程。如果要并行控制线程数运行,相应修改for循环。
此外,如果您想从程序中创建新作业,请将您的 SparkSession 与参数一起传递。
我需要将 MySQL 中一组 table 的增量记录以 Parquet 格式加载到 Amazon S3。这些 table 在 AWS MySQL 托管实例中的几个 databases/schemas 中很常见。代码应该从每个模式(有一组公共的 tables)并行复制数据。
我正在使用读取 API PySpark SQL 连接到 MySQL 实例并读取每个 table 的数据以获取模式并将结果数据帧写入S3 使用 write API 作为 Parquet 文件。我 运行 在数据库中的每个 table 循环中,如下面的代码所示:
def load_data_to_s3(databases_df):
db_query_properties = config['mysql-query']
auto_id_values = config['mysql-auto-id-values']
for row in databases_df.collect():
for table in db_query_properties.keys():
last_recorded_id_value = auto_id_values[table]
select_sql = "select * from {}.{} where id>{}".format(row.database_name, table, last_recorded_id_value)
df = spark.read.format("jdbc") \
.option("driver", mysql_db_properties['driver']) \
.option("url", row.database_connection_url) \
.option("dbtable", select_sql) \
.option("user", username) \
.option("password", password) \
.load()
s3_path = 's3a://{}/{}/{}'.format(s3_bucket, database_dir, table)
df.write.parquet(s3_path, mode="append")
我想知道如何在 EMR 集群中将此代码并行扩展到多个数据库 运行。请给我建议 suitable 方法。如果需要更多详细信息,请告诉我。
您的 list_of_databases
未并行化。要进行并行处理,您应该并行化列表并使用 foreach
或 spark.
在 EMR 中打开并发选项并为每个 table 发送 EMR 步骤,或者您可以使用 Spark 的公平调度程序,它可以在内部并行处理作业,只需对代码进行少量修改。
我可以提出两个解决方案:
1.简单的方法
一次向您的 EMR 提交多个作业(每个数据库一个作业)。如果监控有问题,只需将失败日志写入 S3 或 HDFS。
2。需要更改一些代码
您可以尝试使用线程来并行化从每个数据库中提取的数据。我可以展示如何操作的示例,但您可能需要进行更多更改以适合您的用例。
示例实现:
import threading
def load_data_to_s3(databases_df):
db_query_properties = config['mysql-query']
auto_id_values = config['mysql-auto-id-values']
for row in databases_df.collect():
for table in db_query_properties.keys():
last_recorded_id_value = auto_id_values[table]
select_sql = "select * from {}.{} where id>{}".format(row.database_name, table, last_recorded_id_value)
df = spark.read.format("jdbc") \
.option("driver", mysql_db_properties['driver']) \
.option("url", row.database_connection_url) \
.option("dbtable", select_sql) \
.option("user", username) \
.option("password", password) \
.load()
s3_path = 's3a://{}/{}/{}'.format(s3_bucket, database_dir, table)
df.write.parquet(s3_path, mode="append")
threads = [threading.Thread(target=load_data_to_s3, args=(db) for db in databases_df]
for t in threads:
t.start()
for t in threads:
t.join()
此外,请确保使用 set('spark.scheduler.mode', 'FAIR')
属性 将调度程序更改为 FAIR。这将为您的每个数据库创建一个线程。如果要并行控制线程数运行,相应修改for循环。
此外,如果您想从程序中创建新作业,请将您的 SparkSession 与参数一起传递。