从另一个数据框中的选定信息创建一个新的数据框(具有不同的模式)
Create a new dataframe (with different schema) from selected information from another dataframe
我有一个数据框,其中标签列包含不同的 key->values.
我尝试过滤掉 values
信息,其中 key=name
。过滤掉的信息应该放在一个新的dataframe中。
初始 df
具有以下架构:
root
|-- id: long (nullable = true)
|-- type: string (nullable = true)
|-- tags: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- nds: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- ref: long (nullable = true)
|-- members: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- type: string (nullable = true)
| | |-- ref: long (nullable = true)
| | |-- role: string (nullable = true)
|-- visible: boolean (nullable = true)
我想要一个 newdf
架构:
root
|-- place: string (nullable = true)
|-- num_evacuees string (nullable = true)
我应该如何过滤?我尝试了很多方法,我尝试至少有一个普通的过滤器。但是每次,过滤器的结果每次都是一个空数据框。例如:
val newdf = df.filter($"tags"("key") contains "name")
val newdf = df.where(places("tags")("key") === "name")
我尝试了很多方法,但 none 有效
我应该如何做合适的过滤器
想法是提取映射列(标签)的键,然后使用array_contains检查名为“name”的键。
import org.apache.spark.sql.functions._
val newdf = df.filter(array_contains(map_keys($"tags), "name"))
您可以通过以下方式获得您想要的结果:
val df = Seq(
(1L, Map("sf" -> "100")),
(2L, Map("ny" -> "200"))
).toDF("id", "tags")
val resultDf = df
.select(explode(map_filter(col("tags"), (k, _) => k === "ny")))
.withColumnRenamed("key", "place")
.withColumnRenamed("value", "num_evacuees")
resultDf.printSchema
resultDf.show
将显示:
root
|-- place: string (nullable = false)
|-- num_evacuees: string (nullable = true)
+-----+------------+
|place|num_evacuees|
+-----+------------+
| ny| 200|
+-----+------------+
关键思想是使用 map_filter
到 select 地图中您想要的字段,然后 explode
将地图变成两列(key
和 value
) 然后您可以重命名以使 DataFrame
符合您的规范。
上面的例子假设你想得到一个单一的值来演示这个想法。 map_filter
使用的 lambda 函数可以尽可能复杂。它的签名map_filter(expr: Column, f: (Column, Column) => Column): Column
说明只要你return一个Column
它就会开心
如果您想过滤大量条目,您可以这样做:
val resultDf = df
.withColumn("filterList", array("sf", "place_n"))
.select(explode(map_filter(col("tags"), (k, _) => array_contains(col("filterList"), k))))
我有一个数据框,其中标签列包含不同的 key->values.
我尝试过滤掉 values
信息,其中 key=name
。过滤掉的信息应该放在一个新的dataframe中。
初始 df
具有以下架构:
root
|-- id: long (nullable = true)
|-- type: string (nullable = true)
|-- tags: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- nds: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- ref: long (nullable = true)
|-- members: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- type: string (nullable = true)
| | |-- ref: long (nullable = true)
| | |-- role: string (nullable = true)
|-- visible: boolean (nullable = true)
我想要一个 newdf
架构:
root
|-- place: string (nullable = true)
|-- num_evacuees string (nullable = true)
我应该如何过滤?我尝试了很多方法,我尝试至少有一个普通的过滤器。但是每次,过滤器的结果每次都是一个空数据框。例如:
val newdf = df.filter($"tags"("key") contains "name")
val newdf = df.where(places("tags")("key") === "name")
我尝试了很多方法,但 none 有效 我应该如何做合适的过滤器
想法是提取映射列(标签)的键,然后使用array_contains检查名为“name”的键。
import org.apache.spark.sql.functions._
val newdf = df.filter(array_contains(map_keys($"tags), "name"))
您可以通过以下方式获得您想要的结果:
val df = Seq(
(1L, Map("sf" -> "100")),
(2L, Map("ny" -> "200"))
).toDF("id", "tags")
val resultDf = df
.select(explode(map_filter(col("tags"), (k, _) => k === "ny")))
.withColumnRenamed("key", "place")
.withColumnRenamed("value", "num_evacuees")
resultDf.printSchema
resultDf.show
将显示:
root
|-- place: string (nullable = false)
|-- num_evacuees: string (nullable = true)
+-----+------------+
|place|num_evacuees|
+-----+------------+
| ny| 200|
+-----+------------+
关键思想是使用 map_filter
到 select 地图中您想要的字段,然后 explode
将地图变成两列(key
和 value
) 然后您可以重命名以使 DataFrame
符合您的规范。
上面的例子假设你想得到一个单一的值来演示这个想法。 map_filter
使用的 lambda 函数可以尽可能复杂。它的签名map_filter(expr: Column, f: (Column, Column) => Column): Column
说明只要你return一个Column
它就会开心
如果您想过滤大量条目,您可以这样做:
val resultDf = df
.withColumn("filterList", array("sf", "place_n"))
.select(explode(map_filter(col("tags"), (k, _) => array_contains(col("filterList"), k))))