pyspark - 根据另一个计算列的计算值更新列

pyspark - Updating a column based on a calculated value from another calculated column

  1. 以下代码将数据从 csv 文件加载到数据框中 df。这个df对应的SQLtablemyTable已经存在,会从这个df中导入数据到myTable中。
  2. myTable 有几列。 Column5 和 Column6 存在于 myTable 中,并且是 calculated 列。但是这些列在 csv 文件中不存在。
  3. Column5 的值是根据 Column1 的值计算的。 Column6 的值是根据 Column5 的计算值计算的。这些值分别由 testFunction1 和 testFunction2 计算得出。
  4. 该代码适用于 Column5。但是在下面代码的最后一行 .withColumn("Column6", newFunction2(df.Column5)) 抛出以下错误。

问题:我这里可能做错了什么。以及我们如何修复错误。注意:如果我从 myTable 中删除 Column6,并删除下面代码的最后一行,代码成功地将数据加载到 myTable 中,column5 中的数据填充(按预期)来自 Column1 的计算值。

错误:

AttributeError: 'DataFrame' object has no attribute 'Column6'

代码:

from pyspark.sql.types import StringType
from pyspark.sql import functions as F

df = spark.read.csv(".......dfs.core.windows.net/myDataFile.csv", header="true", inferSchema="false")

def testFunction1(Col1Value):
  #do some calculation on column1 value and return it to column5
  return mystr1

def testFunction2(value):
  # do some calculation on column5 value and return it to column6
  return mystr2

newFunction1 = F.udf(testFunction1, StringType())
newFunction2 = F.udf(testFunction2, StringType())

df2 = df.withColumn("Column5", newFunction1(df.Column1)) \
      .withColumn("Column6", newFunction2(df.Column5)) 

问题出在您创建 df2 时。您正在读取数据框 (df) 并创建列“Column5”。然后引用第二行的列。但是“Column5”在 df 中还不存在。如果将最后一部分分解为两条语句,例如下面的代码,应该可以解决问题:

df2 = df.withColumn("Column5", newFunction1(df.Column1))
df3 = df2.withColumn("Column6", newFunction2(df.Column5))