如何在 Scala 中将多个数组转换为多列 Spark
How to convert multiples arrays into multiple columns Spark in Scala
我是 Scala 的新手,我正在尝试读取包含嵌套文档的 json 文件,我将其转换为 spark table 以访问它们的内部值,其中 return 以下架构。
root
|-- id: array (nullable = true)
| |-- element: long (containsNull = true)
|-- column1: array (nullable = true)
| |-- element: double (containsNull = true)
|-- column2: array (nullable = true)
| |-- element: double (containsNull = true)
|-- column3: array (nullable = true)
| |-- element: double (containsNull = true)
|-- column4: array (nullable = true)
| |-- element: double (containsNull = true)
+--------------------+--------------------+--------------------+--------------------+--------------------+
| id | column1 | column2 | column3 | column4 |
+--------------------+--------------------+--------------------+--------------------+--------------------+
|[1163903, 1135067...|[3.7049873, 3.084...|[3.8597548, 4.188...|[1.6563705, 1.609...|[3.6857932, 3.190...|
+--------------------+--------------------+--------------------+--------------------+--------------------+
这不是我所期望的,我尝试展开所有列,但也不是 return我所期望的,
val exploded = selectedAttributes.columns.foldLeft(selectedAttributes)((selectedAttributes, column) => selectedAttributes.withColumn(column, explode(col(column))))
| id | column1 | column2 | column3 | column4 |
+-------+-----------------+-------------------+-------------------+----------------+
|1163903| 3.7049873| 3.8597548| 1.6563705| 3.6857932|
|1163903| 3.7049873| 3.8597548| 1.6563705| 3.190083|
|1163903| 3.7049873| 3.8597548| 1.6563705| 1.990814|
|1163903| 3.7049873| 3.8597548| 1.6563705| 2.319732|
|1163903| 3.7049873| 3.8597548| 1.6563705| 3.3546507|
|1163903| 3.7049873| 3.8597548| 1.6563705| 2.370629|
|1163903| 3.7049873| 3.8597548| 1.6563705| null|
所以我决定在驱动程序中收集信息(我认为这不是最好的解决方案)并通过调用 zip 函数并创建数据集来自己创建数据框,但也不起作用。
像这样:
val zipFeatures = id zip column1 zip column4 zip column2 zip column3
case class dataset(id: Int, column1: Double, column2: Double, column3: Double, column4: Double)
val rowsOfFeatures = zipFeatures map {
case ((((id, column1), column2), column3), column4) =>
dataset(id, column1, column2, column3, column4)
}
spark.createDataset(rowsOfFeatures).toDF()
这会给我更好的结果,但是,我认为我无法将所有这些信息保存到驱动程序中。
这是预期的输出:
|id |column1 | column2 | column3 | column4 |
|1163903| 3.7049873| 3.8597548| 1.6563705| 3.6857932|
|1135067| 3.0849733| 4.1883473| 1.6097081| 3.190083|
|1136137| 3.415591| 3.12623| 1.7889535| 1.990814|
| 1873| 2.6446266| 3.9076807| 2.0752525| 2.319732|
|1130327| 3.85075| 4.857642| 2.192937| 3.3546507|
| 1879| 2.7091007| 3.8000894| 2.0292222| 2.370629|
| 86684| 4.414381| 3.9849327| null| null|
| 66284| 3.3164778| 4.774783| 1.9173387| 3.1792257|
| 1652| 3.0772924| 3.4006166| 1.7305527| 2.9725764|
|1128385| 4.321163| 3.835489| null| null|
非常感谢任何帮助!
如果您使用的是spark 2.4或以上版本,您可以使用arrays_zip函数和explode函数,得到您想要的结果如下图:
// 使用以下代码创建了一个与您相似的数据框
val columnNames = List("id","col1","col2","col3","col4")
val arr = Seq((Array("1163903","1135067","1136137","1873","1130327","1879","86684","66284","1652","1128385"),Array("3.7049873","3.0849733", "3.415591","2.6446266","3.85075","2.7091007","4.414381","3.3164778","3.0772924","4.321163"),Array("3.8597548","4.1883473","3.12623","3.9076807","4.857642","3.8000894","3.9849327","4.774783","3.4006166","3.835489"),Array("1.6563705","1.6097081","1.7889535","2.0752525","2.192937","2.0292222","","1.9173387","1.7305527"),Array("3.6857932","3.190083","1.990814","2.319732","3.3546507","2.370629","","3.1792257","2.9725764")))
val df = sc.parallelize(arr).toDF( columnNames: _*)
df.printSchema
df.show
// 使用 arrays_zip 和 explode 函数得到你想要的结果如下
val df2 = df.withColumn("newCol",arrays_zip(col("id"),col("col1"),col("col2"),col("col3"),col("col4"))).withColumn("newCol",explode(col("newCol"))).selectExpr("newCol.id as id", "newCol.col1 as col1", "newCol.col2 as col2", "newCol.col3 as col3", "newCol.col4 as col4")
df2.printSchema
df2.show
我是 Scala 的新手,我正在尝试读取包含嵌套文档的 json 文件,我将其转换为 spark table 以访问它们的内部值,其中 return 以下架构。
root
|-- id: array (nullable = true)
| |-- element: long (containsNull = true)
|-- column1: array (nullable = true)
| |-- element: double (containsNull = true)
|-- column2: array (nullable = true)
| |-- element: double (containsNull = true)
|-- column3: array (nullable = true)
| |-- element: double (containsNull = true)
|-- column4: array (nullable = true)
| |-- element: double (containsNull = true)
+--------------------+--------------------+--------------------+--------------------+--------------------+
| id | column1 | column2 | column3 | column4 |
+--------------------+--------------------+--------------------+--------------------+--------------------+
|[1163903, 1135067...|[3.7049873, 3.084...|[3.8597548, 4.188...|[1.6563705, 1.609...|[3.6857932, 3.190...|
+--------------------+--------------------+--------------------+--------------------+--------------------+
这不是我所期望的,我尝试展开所有列,但也不是 return我所期望的,
val exploded = selectedAttributes.columns.foldLeft(selectedAttributes)((selectedAttributes, column) => selectedAttributes.withColumn(column, explode(col(column))))
| id | column1 | column2 | column3 | column4 |
+-------+-----------------+-------------------+-------------------+----------------+
|1163903| 3.7049873| 3.8597548| 1.6563705| 3.6857932|
|1163903| 3.7049873| 3.8597548| 1.6563705| 3.190083|
|1163903| 3.7049873| 3.8597548| 1.6563705| 1.990814|
|1163903| 3.7049873| 3.8597548| 1.6563705| 2.319732|
|1163903| 3.7049873| 3.8597548| 1.6563705| 3.3546507|
|1163903| 3.7049873| 3.8597548| 1.6563705| 2.370629|
|1163903| 3.7049873| 3.8597548| 1.6563705| null|
所以我决定在驱动程序中收集信息(我认为这不是最好的解决方案)并通过调用 zip 函数并创建数据集来自己创建数据框,但也不起作用。
像这样:
val zipFeatures = id zip column1 zip column4 zip column2 zip column3
case class dataset(id: Int, column1: Double, column2: Double, column3: Double, column4: Double)
val rowsOfFeatures = zipFeatures map {
case ((((id, column1), column2), column3), column4) =>
dataset(id, column1, column2, column3, column4)
}
spark.createDataset(rowsOfFeatures).toDF()
这会给我更好的结果,但是,我认为我无法将所有这些信息保存到驱动程序中。
这是预期的输出:
|id |column1 | column2 | column3 | column4 |
|1163903| 3.7049873| 3.8597548| 1.6563705| 3.6857932|
|1135067| 3.0849733| 4.1883473| 1.6097081| 3.190083|
|1136137| 3.415591| 3.12623| 1.7889535| 1.990814|
| 1873| 2.6446266| 3.9076807| 2.0752525| 2.319732|
|1130327| 3.85075| 4.857642| 2.192937| 3.3546507|
| 1879| 2.7091007| 3.8000894| 2.0292222| 2.370629|
| 86684| 4.414381| 3.9849327| null| null|
| 66284| 3.3164778| 4.774783| 1.9173387| 3.1792257|
| 1652| 3.0772924| 3.4006166| 1.7305527| 2.9725764|
|1128385| 4.321163| 3.835489| null| null|
非常感谢任何帮助!
如果您使用的是spark 2.4或以上版本,您可以使用arrays_zip函数和explode函数,得到您想要的结果如下图:
// 使用以下代码创建了一个与您相似的数据框
val columnNames = List("id","col1","col2","col3","col4")
val arr = Seq((Array("1163903","1135067","1136137","1873","1130327","1879","86684","66284","1652","1128385"),Array("3.7049873","3.0849733", "3.415591","2.6446266","3.85075","2.7091007","4.414381","3.3164778","3.0772924","4.321163"),Array("3.8597548","4.1883473","3.12623","3.9076807","4.857642","3.8000894","3.9849327","4.774783","3.4006166","3.835489"),Array("1.6563705","1.6097081","1.7889535","2.0752525","2.192937","2.0292222","","1.9173387","1.7305527"),Array("3.6857932","3.190083","1.990814","2.319732","3.3546507","2.370629","","3.1792257","2.9725764")))
val df = sc.parallelize(arr).toDF( columnNames: _*)
df.printSchema
df.show
// 使用 arrays_zip 和 explode 函数得到你想要的结果如下
val df2 = df.withColumn("newCol",arrays_zip(col("id"),col("col1"),col("col2"),col("col3"),col("col4"))).withColumn("newCol",explode(col("newCol"))).selectExpr("newCol.id as id", "newCol.col1 as col1", "newCol.col2 as col2", "newCol.col3 as col3", "newCol.col4 as col4")
df2.printSchema
df2.show