从另一个数据框中的选定信息创建一个新的数据框(具有不同的模式)

Create a new dataframe (with different schema) from selected information from another dataframe

我有一个数据框,其中标签列包含不同的 key->values. 我尝试过滤掉 values 信息,其中 key=name。过滤掉的信息应该放在一个新的dataframe中。

初始 df 具有以下架构:

root
 |-- id: long (nullable = true)
 |-- type: string (nullable = true)
 |-- tags: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- nds: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ref: long (nullable = true)
 |-- members: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- ref: long (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- visible: boolean (nullable = true)

我想要一个 newdf 架构:

root
 |-- place: string (nullable = true)
 |-- num_evacuees string (nullable = true)

我应该如何过滤?我尝试了很多方法,我尝试至少有一个普通的过滤器。但是每次,过滤器的结果每次都是一个空数据框。例如:

val newdf = df.filter($"tags"("key") contains "name")
val newdf = df.where(places("tags")("key") === "name")

我尝试了很多方法,但 none 有效 我应该如何做合适的过滤器

想法是提取映射列(标签)的键,然后使用array_contains检查名为“name”的键。

import org.apache.spark.sql.functions._
val newdf = df.filter(array_contains(map_keys($"tags), "name"))

您可以通过以下方式获得您想要的结果:

         val df = Seq(
                 (1L, Map("sf" -> "100")),
                 (2L, Map("ny" -> "200"))
               ).toDF("id", "tags")
               
               val resultDf = df
                 .select(explode(map_filter(col("tags"), (k, _) => k === "ny")))
                 .withColumnRenamed("key", "place")
                 .withColumnRenamed("value", "num_evacuees")
               
               resultDf.printSchema
               resultDf.show

将显示:

root
 |-- place: string (nullable = false)
 |-- num_evacuees: string (nullable = true)

+-----+------------+
|place|num_evacuees|
+-----+------------+
|   ny|         200|
+-----+------------+

关键思想是使用 map_filter 到 select 地图中您想要的字段,然后 explode 将地图变成两列(keyvalue) 然后您可以重命名以使 DataFrame 符合您的规范。

上面的例子假设你想得到一个单一的值来演示这个想法。 map_filter 使用的 lambda 函数可以尽可能复杂。它的签名map_filter(expr: Column, f: (Column, Column) => Column): Column说明只要你return一个Column它就会开心

如果您想过滤大量条目,您可以这样做:

val resultDf = df
  .withColumn("filterList", array("sf", "place_n"))
  .select(explode(map_filter(col("tags"), (k, _) => array_contains(col("filterList"), k))))