为不断变化的输入模式在 spark 中分解多个数组列
Exploding multiple array columns in spark for a changing input schema
下面是我的示例架构。
|-- provider: string (nullable = true)
|-- product: string (nullable = true)
|-- asset_name: string (nullable = true)
|-- description: string (nullable = true)
|-- creation_date: string (nullable = true)
|-- provider_id: string (nullable = true)
|-- asset: string (nullable = true)
|-- asset_clas: string (nullable = true)
|-- Actors: array (nullable = true)
| |-- element: string (containsNull = false)
|-- Actors_Display: array (nullable = true)
| |-- element: string (containsNull = false)
|-- Audio_Type: array (nullable = true)
| |-- element: string (containsNull = false)
|-- Billing_ID: array (nullable = true)
| |-- element: string (containsNull = false)
|-- Bit_Rate: array (nullable = true)
| |-- element: string (containsNull = false)
|-- CA_Rating: array (nullable = true)
| |-- element: string (containsNull = false)
我需要展开所有数组类型 columns.I 大约有 80 多个列,并且列不断变化。
我目前正在使用 explode(array_zip)
val df= sourcedf.select($"provider",$"asset_name",$"description",$"creation_date",$"provider_id",$"asset_id",$"asset_class",$"product",$"provider_id",$"eligible_platform",$"actors",$"category",
explode_outer(arrays_zip($"Actors_Display",$"Audio_Type",$"Billing_ID",$"Bit_Rate",$"CA_Rating")
val parsed_output = df.select(col("provider"),("asset_name"),col("description"),col("creation_date"),col("product"),col("provider"),
col("povider_id"),col("asset_id"),col("asset_class"),
col("col.Actors_Display"),col("col.Audio_Type"),col("col.Billing_ID"),col("col.Bit_Rate"),col("col.CA_Rating"))
通过使用,以上我能够得到输出。但这仅适用于一个特定文件。在我的例子中,会经常添加新列。那么,是否有任何函数可以分解多列以更改模式以及 select 文件中的非数组列。
有人可以举个例子吗
注意:只有数组的列不断变化,其余不变。
下面是示例数据
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ADIL>
<Meta>
<AMS Asset_Name="asd" Provider="Level" Product="MOTD" Version_Major="1" Version_Minor="0" Description="ZXC" Creation_Date="2009-12-03" Provider_ID="qwer.com" Asset_ID="A12we" Asset_Class="package"/>
<App_Data App="MOD" Name="Actors" Value="CableLa1.1"/>
<App_Data App="MOD" Name="Actors_Display" Value="RTY"/>
<App_Data App="MOD" Name="Audio_Type" Value="FGH"/>
</Meta>
<Asset>
<Meta>
<AMS Asset_Name="bnm" Provider="Level Film" Product="MOTD" Version_Major="1" Version_Minor="0" Description="bnj7" Creation_Date="2009-12-03" Provider_ID="levelfilm.com" Asset_ID="DDDB0610072533182333" Asset_Class="title"/>
App_Data App="rt" Name="Billing_ID" Value="2020-12-29T00:00:00"/>
<App_Data App="MOD" Name="Bit_Rate" Value="2021-12-29T23:59:59"/>
<App_Data App="MOD" Name="CA_Rating" Value="16.99"/>
</Meta>
<Asset>
<Meta>
<AMS Asset_Name="atysd" Provider="Level1" Product="MOTD2" Version_Major="1" Version_Minor="0" Description="ZXCY" Creation_Date="2009-12-03" Provider_ID="qweDFtrr.com" Asset_ID="A12FGwe" Asset_Class="review"/>
这是xml数据。最初,解析此数据并将所有名称属性值转换为列名,并将所有“值”属性值转换为列名的值。这个 XML 有重复的标签,所以解析后的最终结果在数组列中,我在解析逻辑的末尾使用了 collect_list。
这是解析后的示例输出。
+-------------------+-------------------+-----------------+------------+--------------+
|Actors |Actors_Display |Audio_Type |Billing_ID |Bit_rate
+-------------+---------------+-----------------------------------------+------------
|["movie","cinema",] | ["Dolby 5.1"] | ["High", "low"] | ["GAR15"]| ["15","14"]
+-------------+-----+-------------------+-----------------+--------------+----------
假设您要分解所有 ArrayType 列(否则,相应地进行过滤):
val df = Seq(
(1, "xx", Seq(10, 20), Seq("a", "b"), Seq("p", "q")),
(2, "yy", Seq(30, 40), Seq("c", "d"), Seq("r", "s"))
).toDF("c1", "c2", "a1", "a2", "a3")
import org.apache.spark.sql.types.{StructField, ArrayType}
val arrCols = df.schema.fields
.collect{case StructField(name, _: ArrayType, _, _) => name}
.map(col)
val otherCols = df.columns.map(col) diff arrCols
df.withColumn("arr_zip", explode_outer(arrays_zip(arrCols: _*)))
.select(otherCols.toList ::: $"arr_zip.*" :: Nil: _*)
.show
+---+---+---+---+---+
| c1| c2| a1| a2| a3|
+---+---+---+---+---+
| 1| xx| 10| a| p|
| 1| xx| 20| b| q|
| 2| yy| 30| c| r|
| 2| yy| 40| d| s|
+---+---+---+---+---+
下面是我的示例架构。
|-- provider: string (nullable = true)
|-- product: string (nullable = true)
|-- asset_name: string (nullable = true)
|-- description: string (nullable = true)
|-- creation_date: string (nullable = true)
|-- provider_id: string (nullable = true)
|-- asset: string (nullable = true)
|-- asset_clas: string (nullable = true)
|-- Actors: array (nullable = true)
| |-- element: string (containsNull = false)
|-- Actors_Display: array (nullable = true)
| |-- element: string (containsNull = false)
|-- Audio_Type: array (nullable = true)
| |-- element: string (containsNull = false)
|-- Billing_ID: array (nullable = true)
| |-- element: string (containsNull = false)
|-- Bit_Rate: array (nullable = true)
| |-- element: string (containsNull = false)
|-- CA_Rating: array (nullable = true)
| |-- element: string (containsNull = false)
我需要展开所有数组类型 columns.I 大约有 80 多个列,并且列不断变化。 我目前正在使用 explode(array_zip)
val df= sourcedf.select($"provider",$"asset_name",$"description",$"creation_date",$"provider_id",$"asset_id",$"asset_class",$"product",$"provider_id",$"eligible_platform",$"actors",$"category",
explode_outer(arrays_zip($"Actors_Display",$"Audio_Type",$"Billing_ID",$"Bit_Rate",$"CA_Rating")
val parsed_output = df.select(col("provider"),("asset_name"),col("description"),col("creation_date"),col("product"),col("provider"),
col("povider_id"),col("asset_id"),col("asset_class"),
col("col.Actors_Display"),col("col.Audio_Type"),col("col.Billing_ID"),col("col.Bit_Rate"),col("col.CA_Rating"))
通过使用,以上我能够得到输出。但这仅适用于一个特定文件。在我的例子中,会经常添加新列。那么,是否有任何函数可以分解多列以更改模式以及 select 文件中的非数组列。 有人可以举个例子吗
注意:只有数组的列不断变化,其余不变。
下面是示例数据
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ADIL>
<Meta>
<AMS Asset_Name="asd" Provider="Level" Product="MOTD" Version_Major="1" Version_Minor="0" Description="ZXC" Creation_Date="2009-12-03" Provider_ID="qwer.com" Asset_ID="A12we" Asset_Class="package"/>
<App_Data App="MOD" Name="Actors" Value="CableLa1.1"/>
<App_Data App="MOD" Name="Actors_Display" Value="RTY"/>
<App_Data App="MOD" Name="Audio_Type" Value="FGH"/>
</Meta>
<Asset>
<Meta>
<AMS Asset_Name="bnm" Provider="Level Film" Product="MOTD" Version_Major="1" Version_Minor="0" Description="bnj7" Creation_Date="2009-12-03" Provider_ID="levelfilm.com" Asset_ID="DDDB0610072533182333" Asset_Class="title"/>
App_Data App="rt" Name="Billing_ID" Value="2020-12-29T00:00:00"/>
<App_Data App="MOD" Name="Bit_Rate" Value="2021-12-29T23:59:59"/>
<App_Data App="MOD" Name="CA_Rating" Value="16.99"/>
</Meta>
<Asset>
<Meta>
<AMS Asset_Name="atysd" Provider="Level1" Product="MOTD2" Version_Major="1" Version_Minor="0" Description="ZXCY" Creation_Date="2009-12-03" Provider_ID="qweDFtrr.com" Asset_ID="A12FGwe" Asset_Class="review"/>
这是xml数据。最初,解析此数据并将所有名称属性值转换为列名,并将所有“值”属性值转换为列名的值。这个 XML 有重复的标签,所以解析后的最终结果在数组列中,我在解析逻辑的末尾使用了 collect_list。
这是解析后的示例输出。
+-------------------+-------------------+-----------------+------------+--------------+
|Actors |Actors_Display |Audio_Type |Billing_ID |Bit_rate
+-------------+---------------+-----------------------------------------+------------
|["movie","cinema",] | ["Dolby 5.1"] | ["High", "low"] | ["GAR15"]| ["15","14"]
+-------------+-----+-------------------+-----------------+--------------+----------
假设您要分解所有 ArrayType 列(否则,相应地进行过滤):
val df = Seq(
(1, "xx", Seq(10, 20), Seq("a", "b"), Seq("p", "q")),
(2, "yy", Seq(30, 40), Seq("c", "d"), Seq("r", "s"))
).toDF("c1", "c2", "a1", "a2", "a3")
import org.apache.spark.sql.types.{StructField, ArrayType}
val arrCols = df.schema.fields
.collect{case StructField(name, _: ArrayType, _, _) => name}
.map(col)
val otherCols = df.columns.map(col) diff arrCols
df.withColumn("arr_zip", explode_outer(arrays_zip(arrCols: _*)))
.select(otherCols.toList ::: $"arr_zip.*" :: Nil: _*)
.show
+---+---+---+---+---+
| c1| c2| a1| a2| a3|
+---+---+---+---+---+
| 1| xx| 10| a| p|
| 1| xx| 20| b| q|
| 2| yy| 30| c| r|
| 2| yy| 40| d| s|
+---+---+---+---+---+