如何将多个列转换规则从 XML 文件传递到 Spark 中的 Dataframe?
how to pass multipleColumns transformation rules from XML file to Dataframe in Spark?
我有 XML 文件,其中包含我需要 运行 在 DataFrame
上使用 withColumn
函数的所有转换,例如以下:
我如何将它应用到 DataFrame
.
我有一个使用 Scala
ToolBox
和 runTmirror
编写的代码,它在内部编译代码并且 运行 这些规则优于 DataFrame
。 对于少于 100 列的情况,它工作得很好。但现在要求已更改,列数已从 80 增加到 210,因此此代码因 Whosebug error
而失败。这是 Scala 2.11 的未决问题 (https://github.com/scala/bug/issues/10026)
所以我想使用任何 Spark 实用程序而不是 Scala ToolBox。我也尝试过使用 foldLeft
但它也会出错,因为我无法将列函数(如 lit
或 concat
等)作为列类型传递。
XML 规则文件:
<?xml version="1.0" encoding="utf-8" ?>
- <root>
- <columns>
- <column name="col1">
- <![CDATA[ data("columnA")
]]>
</column>
- <column name="col2">
- <![CDATA[lit("ABC")
]]>
</column>
- <column name="col3">
- <![CDATA[concat(col(columnC),col(columnD))
]]>
</column>
</column>
- <column name="col4">
- <![CDATA[ regexp_replace(regexp_replace(regexp_replace(col("ColumnE"), "\,|\)", "") , "\(", "-") , "^(-)$", "0").cast("double")
]]>
</column>
- <column name="col5">
- <![CDATA[ lit("")
]]>
</column>
.
.
.
.
.
</columns>
</root>
我需要使用的操作
df.withColumn("col1",data("columnA")).withColumn("col2",lit("ABC")).withColumn("col3",concat(col(columnC), col(columnD))).withColumn("col4",regexp_replace(regexp_replace(regexp_replace(col("ColumnE"), "\,|\)", "") , "\(", "-") , "^(-)$", "0").cast("double"))withColumn("col5",lit("")).........
我使用的版本:
Scala 2.11.12
Spark 2.4.3
我试图从 Spark
获得解决方案,但除了 Spark Sql
之外没有得到任何解决方案。
但是我的规则非常复杂,使用 Spark Sql
会使它变得更复杂,所以我坚持使用 ToolBox 的相同逻辑(作为现有系统工作)并解决了 100 列的问题,如下所示:
- 首先,我从 XML 中读取所有规则,并通过 concat 与 .withColumn 生成 CodeRule(与现有代码相同)
- 然后我检查字符串
".withColumn"
是否大于 100(比如 133 列),然后简单地将 CodeRule
分成两部分( 从 1 到 99,FirstRule和秒规则从 100 到 133,LastRule) 并按以下两步应用这些规则
- 首先在输入
DataFrame
上应用 FirstRule
并得到 DataFrame
.
- 在它之后我将结果
Dataframe
与 LastRule
并得到最终的 DataFrame
.
- 它非常适合我的情况。
我有 XML 文件,其中包含我需要 运行 在 DataFrame
上使用 withColumn
函数的所有转换,例如以下:
我如何将它应用到 DataFrame
.
我有一个使用 Scala
ToolBox
和 runTmirror
编写的代码,它在内部编译代码并且 运行 这些规则优于 DataFrame
。 对于少于 100 列的情况,它工作得很好。但现在要求已更改,列数已从 80 增加到 210,因此此代码因 Whosebug error
而失败。这是 Scala 2.11 的未决问题 (https://github.com/scala/bug/issues/10026)
所以我想使用任何 Spark 实用程序而不是 Scala ToolBox。我也尝试过使用 foldLeft
但它也会出错,因为我无法将列函数(如 lit
或 concat
等)作为列类型传递。
XML 规则文件:
<?xml version="1.0" encoding="utf-8" ?>
- <root>
- <columns>
- <column name="col1">
- <![CDATA[ data("columnA")
]]>
</column>
- <column name="col2">
- <![CDATA[lit("ABC")
]]>
</column>
- <column name="col3">
- <![CDATA[concat(col(columnC),col(columnD))
]]>
</column>
</column>
- <column name="col4">
- <![CDATA[ regexp_replace(regexp_replace(regexp_replace(col("ColumnE"), "\,|\)", "") , "\(", "-") , "^(-)$", "0").cast("double")
]]>
</column>
- <column name="col5">
- <![CDATA[ lit("")
]]>
</column>
.
.
.
.
.
</columns>
</root>
我需要使用的操作
df.withColumn("col1",data("columnA")).withColumn("col2",lit("ABC")).withColumn("col3",concat(col(columnC), col(columnD))).withColumn("col4",regexp_replace(regexp_replace(regexp_replace(col("ColumnE"), "\,|\)", "") , "\(", "-") , "^(-)$", "0").cast("double"))withColumn("col5",lit("")).........
我使用的版本:
Scala 2.11.12
Spark 2.4.3
我试图从 Spark
获得解决方案,但除了 Spark Sql
之外没有得到任何解决方案。
但是我的规则非常复杂,使用 Spark Sql
会使它变得更复杂,所以我坚持使用 ToolBox 的相同逻辑(作为现有系统工作)并解决了 100 列的问题,如下所示:
- 首先,我从 XML 中读取所有规则,并通过 concat 与 .withColumn 生成 CodeRule(与现有代码相同)
- 然后我检查字符串
".withColumn"
是否大于 100(比如 133 列),然后简单地将CodeRule
分成两部分( 从 1 到 99,FirstRule和秒规则从 100 到 133,LastRule) 并按以下两步应用这些规则 - 首先在输入
DataFrame
上应用FirstRule
并得到DataFrame
. - 在它之后我将结果
Dataframe
与LastRule
并得到最终的DataFrame
. - 它非常适合我的情况。