从另一个数据中选择的信息创建新的数据框

Create new dataframe from selected information from another datama

我有一个具有以下架构的数据框:

root
 |-- id: long (nullable = true)
 |-- type: string (nullable = true)
 |-- tags: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- lat: Long (nullable = true)
 |-- lon: Long (nullable = true)
 |-- nds: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ref: long (nullable = true)
 |-- members: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- ref: long (nullable = true)
 |    |    |-- role: string (nullable = true)

我想创建一个新数据框 res,其中我 select 来自列 tags 的特定数据。我需要来自 key=placekey=populationvalues。新数据框应具有以下架构:

val schema = StructType(
               Array(
                 StructField("place", StringType),
                 StructField("population", LongType)
               )
             )

我完全不知道该怎么做。我尝试复制第一个数据框,然后 select 列,但这没有用。

有人有解决方案吗?

让我们调用您的原始数据框 df。你可以像这样提取你想要的信息

import org.apache.spark.sql.functions.sql.col

val data = df
  .select("tags")
  .where(
    df("tags")("key") isin (List("place", "population"): _*)
  )
  .select(
    col("tags")("value")
  )
  .collect()
  .toList

这会给你一个 List[Row] 可以用你的模式转换成另一个数据框

import scala.collection.JavaConversions.seqAsJavaList

sparkSession.createDataFrame(seqAsJavaList[Row](data), schema)

给定以下简化输入:

val df = Seq(
  (1L, Map("place" -> "home", "population" -> "1", "name" -> "foo")),
  (2L, Map("place" -> "home", "population" -> "4", "name" -> "foo")),
  (3L, Map("population" -> "3")),
  (4L, Map.empty[String, String])
).toDF("id", "tags")

您想 select 使用方法 map_filter 的值来过滤映射以仅包含您想要的键,然后调用 map_values 来获取这些条目。 map_values returns 一个数组,所以需要用explode_outer 来展平数据。我们在这里使用 explode_outer 是因为您的条目可能既没有地点也没有人口,或者只有两者之一。一旦数据采用我们可以轻松使用的形式,我们只需 select 所需结构中的字段。

我已将 id 列留在其中,因此当您 运行 示例时,您可以看到我们不会删除缺少数据的条目。


val r = df.select(
    col("id"),
    explode_outer(map_values(map_filter(col("tags"), (k,_) => k === "place"))) as "place",
    map_values(map_filter(col("tags"), (k,_) => k === "population")) as "population"
  ).withColumn("population", explode_outer(col("population")))
  .select(
    col("id"),
    array(
      struct(
        col("place"),
        col("population") cast LongType as "population"
      ) as "place_and_population"
    ) as "data"
  )

给出:

root
 |-- id: long (nullable = false)
 |-- data: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- place: string (nullable = true)
 |    |    |-- population: long (nullable = true)

+---+--------------+
| id|          data|
+---+--------------+
|  1|   [{home, 1}]|
|  2|   [{home, 4}]|
|  3|   [{null, 3}]|
|  4|[{null, null}]|
+---+--------------+

您可以直接在 map 类型的列上应用所需的键来提取值,然后根据需要转换和重命名列,如下所示:

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.LongType

val result = dataframe.select(
  col("tags")("place").as("place"),
  col("tags")("population").cast(LongType).as("population")
)

具有以下 tags 列:

+------------------------------------------------+
|tags                                            |
+------------------------------------------------+
|{place -> A, population -> 32, another_key -> X}|
|{place -> B, population -> 64, another_key -> Y}|
+------------------------------------------------+

您得到以下结果:

+-----+----------+
|place|population|
+-----+----------+
|A    |32        |
|B    |64        |
+-----+----------+

具有以下架构:

root
 |-- place: string (nullable = true)
 |-- population: long (nullable = true)