使用 Scala Spark 将元组扩展到数据框中

Expand tuple into dataframe using Scala Spark

我有一个包含 2 个元素的元组,如下所示:

Tuple2(“String1, String2”, ArrayList(“String3”, “String4”))
=> 第一个元素是一个字符串,字符串值以逗号分隔
=> 第二个元素是一个包含字符串列表的数组列表

我想要一个像这样的数据框:

Col1        Col2        Col3
1           String1     String3
2           String1     String4
3           String2     String3
4           String2     String4

TL;DR

import org.apache.spark.sql.functions.{col, explode, monotonically_increasing_id, split}

df
    // `split` "String1, String2" into separate values, then create a row per value using `explode`
    .withColumn("Col2", explode(split(col("_1"), ", ")))
    // create a row per value in the list: "String3", "String4"
    .withColumn("Col3", explode(col("_2")))
    // now that we have our 4 rows, add a new column with an incrementing number
    .withColumn("Col1", monotonically_increasing_id() + 1)
    // only keep the columns we care about
    .select("Col1", "Col2", "Col3")
    .show(false)

完整答案

从你的例子开始:

val tuple2 = Tuple2("String1, String2", List("String3", "String4"))

并将其转换为 DataFrame:

val df = List(tuple2).toDF("_1", "_2")

df.show(false)

给出:

+----------------+------------------+
|_1              |_2                |
+----------------+------------------+
|String1, String2|[String3, String4]|
+----------------+------------------+

现在我们已准备好进行转型:

import org.apache.spark.sql.functions.{col, explode, monotonically_increasing_id, split}

df
    // `split` "String1, String2" into separate values, then create a row per value using `explode`
    .withColumn("Col2", explode(split(col("_1"), ", ")))
    // create a row per value in the list: "String3", "String4"
    .withColumn("Col3", explode(col("_2")))
    // now that we have our 4 rows, add a new column with an incrementing number
    .withColumn("Col1", monotonically_increasing_id() + 1)
    // only keep the columns we care about
    .select("Col1", "Col2", "Col3")
    .show(false)

给出:

+----+-------+-------+
|Col1|Col2   |Col3   |
+----+-------+-------+
|1   |String1|String3|
|2   |String1|String4|
|3   |String2|String3|
|4   |String2|String4|
+----+-------+-------+

额外阅读以获取更多详细信息

值得注意的是操作顺序是关键:

  1. 首先我们explode"String1""String2"进入他们自己的行:
df
    .withColumn("Col2", explode(split(col("_1"), ", ")))
    .select("Col2")
    .show(false)

给出:

+-------+
|Col2   |
+-------+
|String1|
|String2|
+-------+

我们从原来的 1 行变成了两行。

  1. 然后我们爆炸"String3", "String4":
df
    .withColumn("Col2", explode(split(col("_1"), ", ")))
    .withColumn("Col3", explode(col("_2")))
    .select("Col2", "Col3")
    .show(false)

给出:

+-------+-------+
|Col2   |Col3   |
+-------+-------+
|String1|String3|
|String1|String4|
|String2|String3|
|String2|String4|
+-------+-------+
  1. 最后我们添加递增计数。如果我们早些时候这样做,我们会将相同的数值复制到多行。

例如:

df
    // here we add `Col1` to a Dataset of only one row! So we only have the value `1`
    .withColumn("Col1", monotonically_increasing_id() + 1)
    // here we explode row 1, copying the value of `Col1`
    .withColumn("Col2", explode(split(col("_1"), ", ")))
    .withColumn("Col3", explode(col("_2")))
    .select("Col1", "Col2", "Col3")
    .show(false)

给出:

+----+-------+-------+
|Col1|Col2   |Col3   |
+----+-------+-------+
|1   |String1|String3|
|1   |String1|String4|
|1   |String2|String3|
|1   |String2|String4|
+----+-------+-------+