使用胶水(Python/Pyspark)通过配置文件循环从源到 s3 的多个表?
loop through multiple tables from source to s3 using glue (Python/Pyspark) through configuration file?
我正在寻找使用胶水将关系数据库中的多个 tables 摄取到 s3。 table 详细信息存在于配置文件中。配置文件是一个 json 文件。如果有一个代码可以遍历多个 table 名称并将这些 table 提取到 s3 中,将会很有帮助。胶水脚本写在 python (pyspark)
这是配置文件的示例:
{"main_key":{
"source_type": "rdbms",
"source_schema": "DATABASE",
"source_table": "DATABASE.Table_1",
}}
只需编写一个普通的 for 循环来遍历您的数据库配置,然后按照 Spark JDBC documentation 依次连接到它们中的每一个。
假设您的 Glue 作业可以连接到数据库并且已将 Glue 连接添加到其中。这是从我的脚本中提取的一个示例,它执行类似的操作,您需要更新适用于您的数据库的 jdbc url 格式,这个使用 sql 服务器,实现细节用于获取配置文件,遍历项目等
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from datetime import datetime
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
jdbc_url = f"jdbc:sqlserver://{hostname}:{port};databaseName={db_name}"
connection_details = {
"user": 'db_user',
"password": 'db_password',
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
}
tables_config = get_tables_config_from_s3_as_dict()
date_partition = datetime.today().strftime('%Y%m%d')
write_date_partition = f'year={date_partition[0:4]}/month={date_partition[4:6]}/day={date_partition[6:8]}'
for key, value in tables_config.items():
table = value['source_table']
df = spark.read.jdbc(url=jdbc_url, table=table, properties=connection_details)
write_path = f's3a://bucket-name/{table}/{write_date_partition}'
df.write.parquet(write_path)
我正在寻找使用胶水将关系数据库中的多个 tables 摄取到 s3。 table 详细信息存在于配置文件中。配置文件是一个 json 文件。如果有一个代码可以遍历多个 table 名称并将这些 table 提取到 s3 中,将会很有帮助。胶水脚本写在 python (pyspark)
这是配置文件的示例:
{"main_key":{
"source_type": "rdbms",
"source_schema": "DATABASE",
"source_table": "DATABASE.Table_1",
}}
只需编写一个普通的 for 循环来遍历您的数据库配置,然后按照 Spark JDBC documentation 依次连接到它们中的每一个。
假设您的 Glue 作业可以连接到数据库并且已将 Glue 连接添加到其中。这是从我的脚本中提取的一个示例,它执行类似的操作,您需要更新适用于您的数据库的 jdbc url 格式,这个使用 sql 服务器,实现细节用于获取配置文件,遍历项目等
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from datetime import datetime
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
jdbc_url = f"jdbc:sqlserver://{hostname}:{port};databaseName={db_name}"
connection_details = {
"user": 'db_user',
"password": 'db_password',
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
}
tables_config = get_tables_config_from_s3_as_dict()
date_partition = datetime.today().strftime('%Y%m%d')
write_date_partition = f'year={date_partition[0:4]}/month={date_partition[4:6]}/day={date_partition[6:8]}'
for key, value in tables_config.items():
table = value['source_table']
df = spark.read.jdbc(url=jdbc_url, table=table, properties=connection_details)
write_path = f's3a://bucket-name/{table}/{write_date_partition}'
df.write.parquet(write_path)