我如何 运行 SQL SELECT 在 AWS Glue 上在 Spark 中创建 Dataframe?

How do I run SQL SELECT on AWS Glue created Dataframe in Spark?

我在 AWS Glue 中有以下工作,它基本上从一个 table 读取数据并将其提取为 S3 中的 csv 文件,但是我想 运行 对此 [=17] 进行查询=](A Select、SUM 和 GROUPBY)并希望将该输出转换为 CSV,我该如何在 AWS Glue 中执行此操作?我是 Spark 的新手所以请帮助

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = 
"db1", table_name = "dbo1_expdb_dbo_stg_plan", transformation_ctx = 
"datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = 
[("plan_code", "int", "plan_code", "int"), ("plan_id", "int", "plan_id", 
"int")], transformation_ctx = "applymapping1")

datasink2 = glueContext.write_dynamic_frame.from_options(frame = 
applymapping1, connection_type = "s3", connection_options = {"path": 
"s3://bucket"}, format = "csv", transformation_ctx = "datasink2")
job.commit()

粘合上下文的“create_dynamic_frame.from_catalog”函数创建了一个动态帧而不是数据帧。并且动态框架不支持执行 sql 查询。

要执行 sql 查询,您首先需要将动态帧转换为数据帧,在 spark 的内存中注册一个临时文件 table,然后在此临时文件上执行 sql 查询 table.

示例代码:

from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SQLContext

glueContext = GlueContext(SparkContext.getOrCreate())
spark_session = glueContext.spark_session
sqlContext = SQLContext(spark_session.sparkContext, spark_session)

DyF = glueContext.create_dynamic_frame.from_catalog(database="{{database}}", table_name="{{table_name}}")
df = DyF.toDF()
df.registerTempTable('{{name}}')
df = sqlContext.sql('{{your select query with table name that you used for temp table above}}')
df.write.format('{{orc/parquet/whatever}}').partitionBy("{{columns}}").save('path to s3 location')

这就是我首先将胶水动态帧转换为火花数据帧的方法。然后使用 glueContext 对象和 sql 方法进行查询。

spark_dataframe = glue_dynamic_frame.toDF()
spark_dataframe.createOrReplaceTempView("spark_df")

glueContext.sql("""
SELECT * 
FROM spark_df
LIMIT 10
""").show()