我可以在 Glue 中将 RDD 转换为 DataFrame 吗?
Can I convert RDD to DataFrame in Glue?
我的 lambda 函数通过 boto3 触发粘合作业 glue.start_job_run
这是我的胶水作业脚本
from awsglue.utils import getResolvedOptions
import sys
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from operator import add
from pyspark.sql.functions import col, regexp_extract, max
conf = SparkConf().setAppName("pyspark-etl")
sc = SparkContext.getOrCreate(conf=conf)
args = getResolvedOptions(sys.argv,['s3_target_path_key','s3_target_path_bucket'])
bucket = args['s3_target_path_bucket']
fileName = args['s3_target_path_key']
inputFilePath = f"s3a://{bucket}/{fileName}"
finalFilePath = f"s3a://glu-job-final-juiceb"
print(bucket, fileName)
rdd = sc.textFile(inputFilePath)
rdd = rdd.flatMap(lambda x: x.split(" ")).map(lambda x : (x.split(" ")[0], 1)).reduceByKey(add)
df = rdd.toDF(schema=('rawEntities string, Count int'))
df = df.withColumn("Entities", regexp_extract(col("rawEntities"),'[^!".?@:,\'*…_()]+',0))
df = df.filter(col("Entities") != "")
df = df.select("Entities","Count").groupBy("Entities").agg(max("Count").alias("Count"))
df.write.mode("append").options(header='True').parquet(finalFilePath)
Glue 作业错误消息是“AttributeError:'PipelinedRDD' 对象没有属性 'toDF'
谷歌搜索后,我注意到在 glue 中“toDF”表示 DynamicFrame 到 DataFrame。
不是RDD转DataFrame的意思
如何在胶水中将 RDD 转换为 DataFrame?
您不能使用 toDF()
定义模式类型。通过使用 toDF()
方法,我们无法控制模式自定义。话虽如此,使用 createDataFrame()
方法我们可以完全控制模式自定义。
看下面的逻辑-
from pyspark.sql.types import *
schema = StructType([ StructField('rawEntities', StringType()), StructField('Count' , IntegerType())])
df = spark.createDataFrame(data=<your rdd>, schema = schema)
我的 lambda 函数通过 boto3 触发粘合作业 glue.start_job_run
这是我的胶水作业脚本
from awsglue.utils import getResolvedOptions
import sys
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from operator import add
from pyspark.sql.functions import col, regexp_extract, max
conf = SparkConf().setAppName("pyspark-etl")
sc = SparkContext.getOrCreate(conf=conf)
args = getResolvedOptions(sys.argv,['s3_target_path_key','s3_target_path_bucket'])
bucket = args['s3_target_path_bucket']
fileName = args['s3_target_path_key']
inputFilePath = f"s3a://{bucket}/{fileName}"
finalFilePath = f"s3a://glu-job-final-juiceb"
print(bucket, fileName)
rdd = sc.textFile(inputFilePath)
rdd = rdd.flatMap(lambda x: x.split(" ")).map(lambda x : (x.split(" ")[0], 1)).reduceByKey(add)
df = rdd.toDF(schema=('rawEntities string, Count int'))
df = df.withColumn("Entities", regexp_extract(col("rawEntities"),'[^!".?@:,\'*…_()]+',0))
df = df.filter(col("Entities") != "")
df = df.select("Entities","Count").groupBy("Entities").agg(max("Count").alias("Count"))
df.write.mode("append").options(header='True').parquet(finalFilePath)
Glue 作业错误消息是“AttributeError:'PipelinedRDD' 对象没有属性 'toDF'
谷歌搜索后,我注意到在 glue 中“toDF”表示 DynamicFrame 到 DataFrame。
不是RDD转DataFrame的意思
如何在胶水中将 RDD 转换为 DataFrame?
您不能使用 toDF()
定义模式类型。通过使用 toDF()
方法,我们无法控制模式自定义。话虽如此,使用 createDataFrame()
方法我们可以完全控制模式自定义。
看下面的逻辑-
from pyspark.sql.types import *
schema = StructType([ StructField('rawEntities', StringType()), StructField('Count' , IntegerType())])
df = spark.createDataFrame(data=<your rdd>, schema = schema)