如何使用 PySpark 的 WHERE 子句在 AWS Glue 中查询 JDBC 数据库?
How do I query a JDBC database within AWS Glue using a WHERE clause with PySpark?
我有一个自己编写的 Glue 脚本和一个存储在 Glue 目录中的 JDBC 连接。我不知道如何使用 PySpark 从存储在我的 JDBC 连接指向的 RDS 中的 MySQL 数据库执行 select 语句。我还使用 Glue Crawler 来推断我有兴趣查询的 RDS table 的架构。如何使用 WHERE 子句查询 RDS 数据库?
我查看了 DynamicFrameReader 和 GlueContext Class 的文档,但似乎都没有指出我正在寻找的方向。
这取决于你想做什么。比如你想做一个select * from table where <conditions>
,有两个选项:
假设您创建了一个爬网程序并将源插入到您的 AWS Glue 作业中,如下所示:
# Read data from database
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db", table_name = "students", redshift_tmp_dir = args["TempDir"])
- AWS 胶水
# Select the needed fields
selectfields1 = SelectFields.apply(frame = datasource0, paths = ["user_id", "full_name", "is_active", "org_id", "org_name", "institution_id", "department_id"], transformation_ctx = "selectfields1")
filter2 = Filter.apply(frame = selectfields1, f = lambda x: x["org_id"] in org_ids, transformation_ctx="filter2")
- PySpark + AWS 胶水
# Change DynamicFrame to Spark DataFrame
dataframe = DynamicFrame.toDF(datasource0)
# Create a view
dataframe.createOrReplaceTempView("students")
# Use SparkSQL to select the fields
dataframe_sql_df_dim = spark.sql("SELECT user_id, full_name, is_active, org_id, org_name, institution_id, department_id FROM assignments WHERE org_id in (" + org_ids + ")")
# Change back to DynamicFrame
selectfields = DynamicFrame.fromDF(dataframe_sql_df_dim, glueContext, "selectfields2")
我有一个自己编写的 Glue 脚本和一个存储在 Glue 目录中的 JDBC 连接。我不知道如何使用 PySpark 从存储在我的 JDBC 连接指向的 RDS 中的 MySQL 数据库执行 select 语句。我还使用 Glue Crawler 来推断我有兴趣查询的 RDS table 的架构。如何使用 WHERE 子句查询 RDS 数据库?
我查看了 DynamicFrameReader 和 GlueContext Class 的文档,但似乎都没有指出我正在寻找的方向。
这取决于你想做什么。比如你想做一个select * from table where <conditions>
,有两个选项:
假设您创建了一个爬网程序并将源插入到您的 AWS Glue 作业中,如下所示:
# Read data from database
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db", table_name = "students", redshift_tmp_dir = args["TempDir"])
- AWS 胶水
# Select the needed fields
selectfields1 = SelectFields.apply(frame = datasource0, paths = ["user_id", "full_name", "is_active", "org_id", "org_name", "institution_id", "department_id"], transformation_ctx = "selectfields1")
filter2 = Filter.apply(frame = selectfields1, f = lambda x: x["org_id"] in org_ids, transformation_ctx="filter2")
- PySpark + AWS 胶水
# Change DynamicFrame to Spark DataFrame
dataframe = DynamicFrame.toDF(datasource0)
# Create a view
dataframe.createOrReplaceTempView("students")
# Use SparkSQL to select the fields
dataframe_sql_df_dim = spark.sql("SELECT user_id, full_name, is_active, org_id, org_name, institution_id, department_id FROM assignments WHERE org_id in (" + org_ids + ")")
# Change back to DynamicFrame
selectfields = DynamicFrame.fromDF(dataframe_sql_df_dim, glueContext, "selectfields2")