在 PySpark 中更新数据框的某些行或创建新的数据框
Update some rows of a dataframe or create new dataframe in PySpark
我是 PySpark 的新手,我的 objective 是在 AWS Glue 中使用 PySpark 脚本用于:
- 正在 Glue 中从输入文件读取数据帧 => 完成
- 更改满足条件的某些行的列 => 面临问题
- 将同一架构上的更新数据帧写入 S3 => 完成
这个任务看起来很简单,但我找不到完成它的方法,而且我更改代码时仍然面临不同的不同问题。
到目前为止,我的代码如下所示:
Transform2.printSchema() # input schema after reading
Transform2 = Transform2.toDF()
def updateRow(row):
# my logic to update row based on a global condition
#if row["primaryKey"]=="knownKey": row["otherAttribute"]= None
return row
LocalTransform3 = [] # creating new dataframe from Transform2
for row in Transform2.rdd.collect():
row = row.asDict()
row = updateRow(row)
LocalTransform3.append(row)
print(len(LocalTransform3))
columns = Transform2.columns
Transform3 = spark.createDataFrame(LocalTransform3).toDF(*columns)
print('Transform3 count', Transform3.count())
Transform3.printSchema()
Transform3.show(1,truncate=False)
Transform4 = DynamicFrame.fromDF(Transform3, glueContext, "Transform3")
print('Transform4 count', Transform4.count())
我尝试使用多种方法,例如:
- 使用 map 更新 lambda 中的现有行
- 使用 collect()
- 使用 createDataFrame() 创建新数据框
但在以下步骤中遇到错误:
- 无法创建新的更新 rdd
- 无法使用现有列从 rdd 创建新数据框
我在不同阶段遇到的一些 Glue 错误:
- ValueError: 推断后无法确定某些类型
- ValueError: 部分类型无法通过前100行判断,请抽样重试
- 调用 z:org.apache.spark.api.python.PythonRDD.runJob 时出错。追溯(最近一次通话最后一次):
感谢任何有效的代码片段或帮助。
from pyspark.sql.functions import col, lit, when
Transform2 = Transform2.toDF()
withKeyMapping = Transform2.withColumn('otherAttribute', when(col("primaryKey") == "knownKey", lit(None)).otherwise(col('otherAttribute')))
这应该适用于您的 use-case。
我是 PySpark 的新手,我的 objective 是在 AWS Glue 中使用 PySpark 脚本用于:
- 正在 Glue 中从输入文件读取数据帧 => 完成
- 更改满足条件的某些行的列 => 面临问题
- 将同一架构上的更新数据帧写入 S3 => 完成
这个任务看起来很简单,但我找不到完成它的方法,而且我更改代码时仍然面临不同的不同问题。
到目前为止,我的代码如下所示:
Transform2.printSchema() # input schema after reading
Transform2 = Transform2.toDF()
def updateRow(row):
# my logic to update row based on a global condition
#if row["primaryKey"]=="knownKey": row["otherAttribute"]= None
return row
LocalTransform3 = [] # creating new dataframe from Transform2
for row in Transform2.rdd.collect():
row = row.asDict()
row = updateRow(row)
LocalTransform3.append(row)
print(len(LocalTransform3))
columns = Transform2.columns
Transform3 = spark.createDataFrame(LocalTransform3).toDF(*columns)
print('Transform3 count', Transform3.count())
Transform3.printSchema()
Transform3.show(1,truncate=False)
Transform4 = DynamicFrame.fromDF(Transform3, glueContext, "Transform3")
print('Transform4 count', Transform4.count())
我尝试使用多种方法,例如:
- 使用 map 更新 lambda 中的现有行
- 使用 collect()
- 使用 createDataFrame() 创建新数据框
但在以下步骤中遇到错误:
- 无法创建新的更新 rdd
- 无法使用现有列从 rdd 创建新数据框
我在不同阶段遇到的一些 Glue 错误:
- ValueError: 推断后无法确定某些类型
- ValueError: 部分类型无法通过前100行判断,请抽样重试
- 调用 z:org.apache.spark.api.python.PythonRDD.runJob 时出错。追溯(最近一次通话最后一次):
感谢任何有效的代码片段或帮助。
from pyspark.sql.functions import col, lit, when
Transform2 = Transform2.toDF()
withKeyMapping = Transform2.withColumn('otherAttribute', when(col("primaryKey") == "knownKey", lit(None)).otherwise(col('otherAttribute')))
这应该适用于您的 use-case。