如何将多个列转换规则从 XML 文件传递​​到 Spark 中的 Dataframe?

how to pass multipleColumns transformation rules from XML file to Dataframe in Spark?

我有 XML 文件,其中包含我需要 运行 在 DataFrame 上使用 withColumn 函数的所有转换,例如以下: 我如何将它应用到 DataFrame.

我有一个使用 Scala ToolBoxrunTmirror 编写的代码,它在内部编译代码并且 运行 这些规则优于 DataFrame对于少于 100 列的情况,它工作得很好。但现在要求已更改,列数已从 80 增加到 210,因此此代码因 Whosebug error 而失败。这是 Scala 2.11 的未决问题 (https://github.com/scala/bug/issues/10026)

所以我想使用任何 Spark 实用程序而不是 Scala ToolBox。我也尝试过使用 foldLeft 但它也会出错,因为我无法将列函数(如 litconcat 等)作为列类型传递。

XML 规则文件:

    <?xml version="1.0" encoding="utf-8" ?> 
    - <root>
    - <columns>
    - <column name="col1">
    - <![CDATA[ data("columnA")        
      ]]> 
      </column>
    - <column name="col2">
    - <![CDATA[lit("ABC")

      ]]> 
      </column>
    - <column name="col3">
    - <![CDATA[concat(col(columnC),col(columnD))      
      ]]> 
      </column>
      </column>
    - <column name="col4">
    - <![CDATA[         regexp_replace(regexp_replace(regexp_replace(col("ColumnE"), "\,|\)", "") , "\(", "-") , "^(-)$", "0").cast("double")

      ]]> 
      </column>
    - <column name="col5">
    - <![CDATA[                 lit("")

      ]]> 
      </column>
.
.
.
.
. 
     </columns>
      </root>

我需要使用的操作

df.withColumn("col1",data("columnA")).withColumn("col2",lit("ABC")).withColumn("col3",concat(col(columnC), col(columnD))).withColumn("col4",regexp_replace(regexp_replace(regexp_replace(col("ColumnE"), "\,|\)", "") , "\(", "-") , "^(-)$", "0").cast("double"))withColumn("col5",lit("")).........

我使用的版本:

Scala 2.11.12

Spark 2.4.3

我试图从 Spark 获得解决方案,但除了 Spark Sql 之外没有得到任何解决方案。 但是我的规则非常复杂,使用 Spark Sql 会使它变得更复杂,所以我坚持使用 ToolBox 的相同逻辑(作为现有系统工作)并解决了 100 列的问题,如下所示:

  1. 首先,我从 XML 中读取所有规则,并通过 concat 与 .withColumn 生成 CodeRule(与现有代码相同)
  2. 然后我检查字符串 ".withColumn" 是否大于 100(比如 133 列),然后简单地将 CodeRule 分成两部分( 从 1 到 99,FirstRule和秒规则从 100 到 133,LastRule) 并按以下两步应用这些规则
  3. 首先在输入 DataFrame 上应用 FirstRule 并得到 DataFrame.
  4. 在它之后我将结果 DataframeLastRule 并得到最终的 DataFrame.
  5. 它非常适合我的情况。