Scala spark:从模式中提取列

Scala spark: extract columns from a schema

我的架构如下所示:

 |-- contributors: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: array (valueContainsNull = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- type: string (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |-- id: string (nullable = true)

我想要一个包含 keynameid

列的数据框

我已使用以下代码获取 nameid 但如何获取列 key

df.select(explode(col("contributors")))
  .select(explode(col("value")))
  .select(col("col.*"))

更新

我尝试将第一个解决方案应用于以下架构,但编译器不喜欢它。我想得到 value._namesubgenres.element.value._name

         |-- mainGenre: struct (nullable = true)
         |    |-- value: struct (nullable = true)
         |    |    |-- _name: string (nullable = true)
         |    |-- subgenres: array (nullable = true)
         |    |    |-- element: struct (containsNull = true)
         |    |    |    |-- value: struct (nullable = true)
         |    |    |    |    |-- type: string (nullable = true))
         |    |    |    |    |-- _name: string (nullable = true)
         |    |    |    |-- name: map (nullable = true)
         |    |    |    |    |-- key: string
         |    |    |    |    |-- value: string (valueContainsNull = true)

我尝试用 value._name 创建一个变量,然后像这样将它插入到我的第二个变量中。

val col_mainGenre_name = df_r.select(col("mainGenre.*"))
                             .select(col("value.*"))
                             .select(col("_name"))
                             .drop("readableName")
                             .drop("description")

val df_exploded = df_r.select(col("mainGenre.*"))
                      .select(col_mainGenre_name, col("value.*"))

您可以在第二个和第三个 select 中添加 key 列。 select dataframe 方法接受多个列作为参数。

你应该修改你的代码如下:

import org.apache.spark.sql.functions.{col, explode}

df.select(explode(col("contributors")))
  .select(col("key"), explode(col("value")))
  .select(col("key"), col("col.*"))

具有以下 contributors 输入列:

+--------------------------------------------------------------------------------------------+
|contributors                                                                                |
+--------------------------------------------------------------------------------------------+
|{key1 -> [{type11, name11, id11}, {type12, name12, id12}], key2 -> [{type21, name21, id21}]}|
|{key3 -> [{type31, name31, id31}, {type32, name32, id32}], key4 -> []}                      |
+--------------------------------------------------------------------------------------------+

您得到以下输出:

+----+------+------+----+
|key |type  |name  |id  |
+----+------+------+----+
|key1|type11|name11|id11|
|key1|type12|name12|id12|
|key2|type21|name21|id21|
|key3|type31|name31|id31|
|key3|type32|name32|id32|
+----+------+------+----+

如果您只想保留 nameid 列的值,您还应该将最后一个 select 修改为 select 只有 col.idcol.name 列:

import org.apache.spark.sql.functions.{col, explode}

df.select(explode(col("contributors")))
  .select(col("key"), explode(col("value")))
  .select(col("key"), col("col.name"), col("col.id"))

使用相同的 contributors 列输入,您将获得预期的输出:

+----+------+----+
|key |name  |id  |
+----+------+----+
|key1|name11|id11|
|key1|name12|id12|
|key2|name21|id21|
|key3|name31|id31|
|key3|name32|id32|
+----+------+----+