在 PySpark 中更新数据框的某些行或创建新的数据框

Update some rows of a dataframe or create new dataframe in PySpark

我是 PySpark 的新手,我的 objective 是在 AWS Glue 中使用 PySpark 脚本用于:

  1. 正在 Glue 中从输入文件读取数据帧 => 完成
  2. 更改满足条件的某些行的列 => 面临问题
  3. 将同一架构上的更新数据帧写入 S3 => 完成

这个任务看起来很简单,但我找不到完成它的方法,而且我更改代码时仍然面临不同的不同问题。

到目前为止,我的代码如下所示:

Transform2.printSchema() # input schema after reading 
Transform2 = Transform2.toDF()
def updateRow(row):
    # my logic to update row based on a global condition 
    #if row["primaryKey"]=="knownKey": row["otherAttribute"]= None
    return row

LocalTransform3 = [] # creating new dataframe from Transform2 
for row in Transform2.rdd.collect():
    row = row.asDict()
    row = updateRow(row)
    LocalTransform3.append(row)
print(len(LocalTransform3))

columns = Transform2.columns
Transform3 = spark.createDataFrame(LocalTransform3).toDF(*columns)
print('Transform3 count', Transform3.count())
Transform3.printSchema()
Transform3.show(1,truncate=False)

Transform4 = DynamicFrame.fromDF(Transform3, glueContext, "Transform3")
print('Transform4 count', Transform4.count()) 

我尝试使用多种方法,例如:

但在以下步骤中遇到错误:

我在不同阶段遇到的一些 Glue 错误:

感谢任何有效的代码片段或帮助。

from pyspark.sql.functions import col, lit, when

Transform2 = Transform2.toDF()
withKeyMapping = Transform2.withColumn('otherAttribute', when(col("primaryKey") == "knownKey", lit(None)).otherwise(col('otherAttribute')))

这应该适用于您的 use-case。