我可以在 Glue 中将 RDD 转换为 DataFrame 吗?

Can I convert RDD to DataFrame in Glue?

我的 lambda 函数通过 boto3 触发粘合作业 glue.start_job_run

这是我的胶水作业脚本

from awsglue.utils import getResolvedOptions
import sys
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from operator import add
from pyspark.sql.functions import col, regexp_extract, max

conf = SparkConf().setAppName("pyspark-etl")
sc = SparkContext.getOrCreate(conf=conf)

args = getResolvedOptions(sys.argv,['s3_target_path_key','s3_target_path_bucket'])
bucket = args['s3_target_path_bucket']
fileName = args['s3_target_path_key']

inputFilePath = f"s3a://{bucket}/{fileName}"
finalFilePath = f"s3a://glu-job-final-juiceb"

print(bucket, fileName)

rdd = sc.textFile(inputFilePath)
rdd = rdd.flatMap(lambda x: x.split(" ")).map(lambda x : (x.split(" ")[0], 1)).reduceByKey(add)
df = rdd.toDF(schema=('rawEntities string, Count int'))
df = df.withColumn("Entities", regexp_extract(col("rawEntities"),'[^!".?@:,\'*…_()]+',0))
df = df.filter(col("Entities") != "")
df = df.select("Entities","Count").groupBy("Entities").agg(max("Count").alias("Count"))
df.write.mode("append").options(header='True').parquet(finalFilePath)

Glue 作业错误消息是“AttributeError:'PipelinedRDD' 对象没有属性 'toDF'

谷歌搜索后,我注意到在 glue 中“toDF”表示 DynamicFrame 到 DataFrame。

不是RDD转DataFrame的意思

如何在胶水中将 RDD 转换为 DataFrame?

您不能使用 toDF() 定义模式类型。通过使用 toDF() 方法,我们无法控制模式自定义。话虽如此,使用 createDataFrame() 方法我们可以完全控制模式自定义。

看下面的逻辑-

from pyspark.sql.types import *

schema = StructType([ StructField('rawEntities', StringType()),  StructField('Count' , IntegerType())])

df = spark.createDataFrame(data=<your rdd>, schema = schema)