NotNull 条件不适用于 spark 数据框 scala 中的 withColumn 条件

NotNull condition is not working for withColumn condition in spark data frame scala

所以当我找到它时我试图添加它,但是当 xml 模式中不存在该列时我不想添加。 这就是我正在做的我想我在检查条件时做错了什么。

  val temp = tempNew1
  .withColumn("BookMark", when($"AsReportedItem.fs:BookMark".isNotNull or $"AsReportedItem.fs:BookMark" =!= "", 0))
  .withColumn("DocByteOffset", when($"AsReportedItem.fs:DocByteOffset".isNotNull or $"AsReportedItem.fs:DocByteOffset" =!= "", 0))
  .withColumn("DocByteLength", when($"AsReportedItem.fs:DocByteLength".isNotNull or $"AsReportedItem.fs:DocByteLength" =!= "", 0))
  .withColumn("EditedDescription", when($"AsReportedItem.fs:EditedDescription".isNotNull or $"AsReportedItem.fs:EditedDescription" =!= "", 0))
  .withColumn("EditedDescription", when($"AsReportedItem.fs:EditedDescription._VALUE".isNotNull or $"AsReportedItem.fs:EditedDescription._VALUE" =!= "", 0))
  .withColumn("EditedDescription_languageId", when($"AsReportedItem.fs:EditedDescription._languageId".isNotNull or $"AsReportedItem.fs:EditedDescription._languageId" =!= "", 0))
  .withColumn("ReportedDescription", when($"AsReportedItem.fs:ReportedDescription._VALUE".isNotNull or $"AsReportedItem.fs:ReportedDescription._VALUE" =!= "", 0))
  .withColumn("ReportedDescription_languageId", when($"AsReportedItem.fs:ReportedDescription._languageId".isNotNull or $"AsReportedItem.fs:ReportedDescription._languageId" =!= "", 0))
  .withColumn("FinancialAsReportedLineItemName_languageId", when($"FinancialAsReportedLineItemName._languageId".isNotNull or $"FinancialAsReportedLineItemName._languageId" =!= "", 0))
  .withColumn("FinancialAsReportedLineItemName", when($"FinancialAsReportedLineItemName._VALUE".isNotNull or $"FinancialAsReportedLineItemName._VALUE" =!= "", 0))
  .withColumn("PeriodPermId_objectTypeId", when($"PeriodPermId._objectTypeId".isNotNull or $"PeriodPermId._objectTypeId" =!= "", 0))
  .withColumn("PeriodPermId", when($"PeriodPermId._VALUE".isNotNull or $"PeriodPermId._VALUE" =!= "", 0))
  .drop($"AsReportedItem").drop($"AsReportedItem")

但是当我找到专栏时它对我来说工作正常但是当专栏不存在于 tempNew1 时我得到错误。

基本上,如果在架构中找不到标记,我根本不想使用 withColumn。

舒缓我在这里失踪。请帮助我确定问题。

我得到的错误低于

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'AsReportedItem.fs:BookMark' given input columns: [IsAsReportedCurrencySetManually,

这也是我试过的

    def hasColumn(df: DataFrame, path: String) = Try(df(path)).isSuccess
 val temp = tempNew1.withColumn("BookMark", when(hasColumn(tempNew1,"AsReportedItem.fs:BookMark") == true, $"AsReportedItem.fs:BookMark"))

但无法使其完全发挥作用..

这是可行的,但我如何为所有列编写它。

val temp = if (hasColumn(tempNew1, "AsReportedItem")) {
      tempNew1
        .withColumn("BookMark", $"AsReportedItem.fs:BookMark")
        .withColumn("DocByteOffset", $"AsReportedItem.fs:DocByteOffset")
        .withColumn("DocByteLength", $"AsReportedItem.fs:DocByteLength")
        .withColumn("EditedDescription", $"AsReportedItem.fs:EditedDescription")
        .withColumn("EditedDescription", $"AsReportedItem.fs:EditedDescription._VALUE")
        .withColumn("EditedDescription_languageId", $"AsReportedItem.fs:EditedDescription._languageId")
        .withColumn("ReportedDescription", $"AsReportedItem.fs:ReportedDescription._VALUE")
        .withColumn("ReportedDescription_languageId", $"AsReportedItem.fs:ReportedDescription._languageId")
        .withColumn("FinancialAsReportedLineItemName_languageId", $"FinancialAsReportedLineItemName._languageId")
        .withColumn("FinancialAsReportedLineItemName", $"FinancialAsReportedLineItemName._VALUE")
        .withColumn("PeriodPermId_objectTypeId", $"PeriodPermId._objectTypeId")
        .withColumn("PeriodPermId", $"PeriodPermId._VALUE")
        .drop($"AsReportedItem")
    } else {
      tempNew1
        .withColumn("BookMark", lit(null))
        .withColumn("DocByteOffset", lit(null))
        .withColumn("DocByteLength", lit(null))
        .withColumn("EditedDescription", lit(null))
        .withColumn("EditedDescription", lit(null))
        .withColumn("EditedDescription_languageId", lit(null))
        .withColumn("ReportedDescription", lit(null))
        .withColumn("ReportedDescription_languageId", lit(null))
        .withColumn("FinancialAsReportedLineItemName_languageId", $"FinancialAsReportedLineItemName._languageId")
        .withColumn("FinancialAsReportedLineItemName", $"FinancialAsReportedLineItemName._VALUE")
        .withColumn("PeriodPermId_objectTypeId", $"PeriodPermId._objectTypeId")
        .withColumn("PeriodPermId", $"PeriodPermId._VALUE")
        .drop($"AsReportedItem")

    }

添加主数据框的模式

root
 |-- DataPartition: string (nullable = true)
 |-- TimeStamp: string (nullable = true)
 |-- PeriodId: long (nullable = true)
 |-- SourceId: long (nullable = true)
 |-- FinancialStatementLineItem_lineItemId: long (nullable = true)
 |-- FinancialStatementLineItem_lineItemInstanceKey: long (nullable = true)
 |-- StatementCurrencyId: long (nullable = true)
 |-- StatementTypeCode: string (nullable = true)
 |-- uniqueFundamentalSet: long (nullable = true)
 |-- AuditID: string (nullable = true)
 |-- EstimateMethodCode: string (nullable = true)
 |-- EstimateMethodId: long (nullable = true)
 |-- FinancialAsReportedLineItemName: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
 |    |-- _languageId: long (nullable = true)
 |-- FinancialStatementLineItemSequence: long (nullable = true)
 |-- FinancialStatementLineItemValue: double (nullable = true)
 |-- FiscalYear: long (nullable = true)
 |-- IsAnnual: boolean (nullable = true)
 |-- IsAsReportedCurrencySetManually: boolean (nullable = true)
 |-- IsCombinedItem: boolean (nullable = true)
 |-- IsDerived: boolean (nullable = true)
 |-- IsExcludedFromStandardization: boolean (nullable = true)
 |-- IsFinal: boolean (nullable = true)
 |-- IsTotal: boolean (nullable = true)
 |-- PeriodEndDate: string (nullable = true)
 |-- PeriodPermId: struct (nullable = true)
 |    |-- _VALUE: long (nullable = true)
 |    |-- _objectTypeId: long (nullable = true)
 |-- ReportedCurrencyId: long (nullable = true)
 |-- StatementSectionCode: string (nullable = true)
 |-- StatementSectionId: long (nullable = true)
 |-- StatementSectionIsCredit: boolean (nullable = true)
 |-- SystemDerivedTypeCode: string (nullable = true)
 |-- SystemDerivedTypeCodeId: long (nullable = true)
 |-- Unit: double (nullable = true)
 |-- UnitEnumerationId: long (nullable = true)
 |-- FFAction|!|: string (nullable = true)
 |-- PartitionYear: long (nullable = true)
 |-- PartitionStatement: string (nullable = true)

在架构中出现列后添加架构

|-- uniqueFundamentalSet: long (nullable = true)
 |-- AsReportedItem: struct (nullable = true)
 |    |-- fs:BookMark: string (nullable = true)
 |    |-- fs:DocByteLength: long (nullable = true)
 |    |-- fs:DocByteOffset: long (nullable = true)
 |    |-- fs:EditedDescription: struct (nullable = true)
 |    |    |-- _VALUE: string (nullable = true)
 |    |    |-- _languageId: long (nullable = true)
 |    |-- fs:ItemDisplayedNegativeFlag: boolean (nullable = true)
 |    |-- fs:ItemDisplayedValue: double (nullable = true)
 |    |-- fs:ItemScalingFactor: long (nullable = true)
 |    |-- fs:ReportedDescription: struct (nullable = true)
 |    |    |-- _VALUE: string (nullable = true)
 |    |    |-- _languageId: long (nullable = true)
 |    |-- fs:ReportedValue: double (nullable = true)
 |-- EstimateMethodCode: string (nullable = true)
 |-- EstimateMethodId: long (nullable = true)
 |-- FinancialAsReportedLineItemName: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
 |    |-- _languageId: long (nullable = true)
 |-- FinancialLineItemSource: long (nullable = true)

把它作为一个答案,因为它变得太大而无法评论。

假设您有一个要添加的列的集合:

val cols = Seq("BookMark")

您需要在原始 DataFrame 上重复调用 withColumn,将结果分配给新的 DataFrame。有一个功能操作可以做到这一点,称为 fold:

val result = cols.foldLeft(tempNew1)((df, name) =>
  df.withColumn(name, if (df.column.contains(s"AsReportedItem.fs:$name"))
    col(s"AsReportedItem.fs:$name") else lit("null")))

fold 采用第一个参数(在您的情况下为 tempNew1)并为 cols 中的每个元素调用提供的函数,将结果分配给新的 DataFrame每次

我将向您展示在 AsReportedItem 结构列 上应用逻辑的一般方法(为清楚起见,我在代码中进行了评论)

//required column names even though the elements are not present in AsReportedItem struct column
val requiredAsReportedItemColumns = Array("BookMark", "DocByteOffset", "DocByteLength", "EditedDescription", "EditedDescription", "EditedDescription_languageId", "ReportedDescription", "ReportedDescription_languageId")
//selecting the elements of AsReportedItem struct columns for checking condition using when
//checking for structfields inside the selected struct field
def getFields(parent: String, schema: StructType): Seq[String] = schema.fields.flatMap {
  case StructField(name, t: StructType, _, _) => getFields(parent + name + ".", t)
  case StructField(name, _, _, _) => Seq(s"$parent$name")
}

//checking for struct column if present the get the fields of nested structs as well
val AsReportedItemColumns = if(tempNew1.columns.contains("AsReportedItem")) getFields("", tempNew1.select("AsReportedItem.*").schema).toArray.map(x => x.substring(3, x.length)) else Array.empty[String]

//finding difference between required number of columns and the elements present in AsReportedItem struct column
val notInAsReportedItemColumns = requiredAsReportedItemColumns.diff(AsReportedItemColumns.map(x => x.toString.replace(".", "")))

//checking condition for the elements present in AsReportedItem struct column
val temp_for_AsReportedItem = AsReportedItemColumns.foldLeft(tempNew1){(tempdf, name) => tempdf.withColumn(name.replace(".", ""), col(s"AsReportedItem.fs:$name"))}
//populating nulls for the columns that are not present in AsReportedItem struct column
val final_AsReportedItem = notInAsReportedItemColumns.foldLeft(temp_for_AsReportedItem){(tempdf, name)=> tempdf.withColumn(name, lit(null))}.drop("AsReportedItem")

对其余两个结构列 FinancialAsReportedLineItemNamePeriodPermId 应用相同的逻辑,但在转换后的数据帧上,即 final_AsReportedItem不在 tempNew1

归功于