使用 Scala Spark 将元组扩展到数据框中
Expand tuple into dataframe using Scala Spark
我有一个包含 2 个元素的元组,如下所示:
Tuple2(“String1, String2”, ArrayList(“String3”, “String4”))
=> 第一个元素是一个字符串,字符串值以逗号分隔
=> 第二个元素是一个包含字符串列表的数组列表
我想要一个像这样的数据框:
Col1 Col2 Col3
1 String1 String3
2 String1 String4
3 String2 String3
4 String2 String4
TL;DR
import org.apache.spark.sql.functions.{col, explode, monotonically_increasing_id, split}
df
// `split` "String1, String2" into separate values, then create a row per value using `explode`
.withColumn("Col2", explode(split(col("_1"), ", ")))
// create a row per value in the list: "String3", "String4"
.withColumn("Col3", explode(col("_2")))
// now that we have our 4 rows, add a new column with an incrementing number
.withColumn("Col1", monotonically_increasing_id() + 1)
// only keep the columns we care about
.select("Col1", "Col2", "Col3")
.show(false)
完整答案
从你的例子开始:
val tuple2 = Tuple2("String1, String2", List("String3", "String4"))
并将其转换为 DataFrame:
val df = List(tuple2).toDF("_1", "_2")
df.show(false)
给出:
+----------------+------------------+
|_1 |_2 |
+----------------+------------------+
|String1, String2|[String3, String4]|
+----------------+------------------+
现在我们已准备好进行转型:
import org.apache.spark.sql.functions.{col, explode, monotonically_increasing_id, split}
df
// `split` "String1, String2" into separate values, then create a row per value using `explode`
.withColumn("Col2", explode(split(col("_1"), ", ")))
// create a row per value in the list: "String3", "String4"
.withColumn("Col3", explode(col("_2")))
// now that we have our 4 rows, add a new column with an incrementing number
.withColumn("Col1", monotonically_increasing_id() + 1)
// only keep the columns we care about
.select("Col1", "Col2", "Col3")
.show(false)
给出:
+----+-------+-------+
|Col1|Col2 |Col3 |
+----+-------+-------+
|1 |String1|String3|
|2 |String1|String4|
|3 |String2|String3|
|4 |String2|String4|
+----+-------+-------+
额外阅读以获取更多详细信息
值得注意的是操作顺序是关键:
- 首先我们explode
"String1"
和"String2"
进入他们自己的行:
df
.withColumn("Col2", explode(split(col("_1"), ", ")))
.select("Col2")
.show(false)
给出:
+-------+
|Col2 |
+-------+
|String1|
|String2|
+-------+
我们从原来的 1 行变成了两行。
- 然后我们爆炸
"String3", "String4"
:
df
.withColumn("Col2", explode(split(col("_1"), ", ")))
.withColumn("Col3", explode(col("_2")))
.select("Col2", "Col3")
.show(false)
给出:
+-------+-------+
|Col2 |Col3 |
+-------+-------+
|String1|String3|
|String1|String4|
|String2|String3|
|String2|String4|
+-------+-------+
- 最后我们添加递增计数。如果我们早些时候这样做,我们会将相同的数值复制到多行。
例如:
df
// here we add `Col1` to a Dataset of only one row! So we only have the value `1`
.withColumn("Col1", monotonically_increasing_id() + 1)
// here we explode row 1, copying the value of `Col1`
.withColumn("Col2", explode(split(col("_1"), ", ")))
.withColumn("Col3", explode(col("_2")))
.select("Col1", "Col2", "Col3")
.show(false)
给出:
+----+-------+-------+
|Col1|Col2 |Col3 |
+----+-------+-------+
|1 |String1|String3|
|1 |String1|String4|
|1 |String2|String3|
|1 |String2|String4|
+----+-------+-------+
我有一个包含 2 个元素的元组,如下所示:
Tuple2(“String1, String2”, ArrayList(“String3”, “String4”))
=> 第一个元素是一个字符串,字符串值以逗号分隔
=> 第二个元素是一个包含字符串列表的数组列表
我想要一个像这样的数据框:
Col1 Col2 Col3
1 String1 String3
2 String1 String4
3 String2 String3
4 String2 String4
TL;DR
import org.apache.spark.sql.functions.{col, explode, monotonically_increasing_id, split}
df
// `split` "String1, String2" into separate values, then create a row per value using `explode`
.withColumn("Col2", explode(split(col("_1"), ", ")))
// create a row per value in the list: "String3", "String4"
.withColumn("Col3", explode(col("_2")))
// now that we have our 4 rows, add a new column with an incrementing number
.withColumn("Col1", monotonically_increasing_id() + 1)
// only keep the columns we care about
.select("Col1", "Col2", "Col3")
.show(false)
完整答案
从你的例子开始:
val tuple2 = Tuple2("String1, String2", List("String3", "String4"))
并将其转换为 DataFrame:
val df = List(tuple2).toDF("_1", "_2")
df.show(false)
给出:
+----------------+------------------+
|_1 |_2 |
+----------------+------------------+
|String1, String2|[String3, String4]|
+----------------+------------------+
现在我们已准备好进行转型:
import org.apache.spark.sql.functions.{col, explode, monotonically_increasing_id, split}
df
// `split` "String1, String2" into separate values, then create a row per value using `explode`
.withColumn("Col2", explode(split(col("_1"), ", ")))
// create a row per value in the list: "String3", "String4"
.withColumn("Col3", explode(col("_2")))
// now that we have our 4 rows, add a new column with an incrementing number
.withColumn("Col1", monotonically_increasing_id() + 1)
// only keep the columns we care about
.select("Col1", "Col2", "Col3")
.show(false)
给出:
+----+-------+-------+
|Col1|Col2 |Col3 |
+----+-------+-------+
|1 |String1|String3|
|2 |String1|String4|
|3 |String2|String3|
|4 |String2|String4|
+----+-------+-------+
额外阅读以获取更多详细信息
值得注意的是操作顺序是关键:
- 首先我们explode
"String1"
和"String2"
进入他们自己的行:
df
.withColumn("Col2", explode(split(col("_1"), ", ")))
.select("Col2")
.show(false)
给出:
+-------+
|Col2 |
+-------+
|String1|
|String2|
+-------+
我们从原来的 1 行变成了两行。
- 然后我们爆炸
"String3", "String4"
:
df
.withColumn("Col2", explode(split(col("_1"), ", ")))
.withColumn("Col3", explode(col("_2")))
.select("Col2", "Col3")
.show(false)
给出:
+-------+-------+
|Col2 |Col3 |
+-------+-------+
|String1|String3|
|String1|String4|
|String2|String3|
|String2|String4|
+-------+-------+
- 最后我们添加递增计数。如果我们早些时候这样做,我们会将相同的数值复制到多行。
例如:
df
// here we add `Col1` to a Dataset of only one row! So we only have the value `1`
.withColumn("Col1", monotonically_increasing_id() + 1)
// here we explode row 1, copying the value of `Col1`
.withColumn("Col2", explode(split(col("_1"), ", ")))
.withColumn("Col3", explode(col("_2")))
.select("Col1", "Col2", "Col3")
.show(false)
给出:
+----+-------+-------+
|Col1|Col2 |Col3 |
+----+-------+-------+
|1 |String1|String3|
|1 |String1|String4|
|1 |String2|String3|
|1 |String2|String4|
+----+-------+-------+