Spark 中的关系转换
Relational transformations in Spark
我正在尝试使用 Spark DataSet 来加载相当大的(比方说)人物数据,其中的子集数据如下所示。
|age|maritalStatus| name|sex|
+---+-------------+--------+---+
| 35| M| Joanna| F|
| 25| S|Isabelle| F|
| 19| S| Andy| M|
| 70| M| Robert| M|
+---+-------------+--------+---+
我需要进行关系转换,其中一列从其他列派生其值。
例如,根据每个人记录的 "age" & "sex" 我需要在每个 "name" 属性前面加上 Mr 或 Ms/Mrs。另一个例子,对于一个 "age" 超过 60 岁的人,我需要将他或她标记为老年人(派生列 "seniorCitizen" 为 Y)。
我最终需要转换后的数据如下:
+---+-------------+---------------------------+---+
|age|maritalStatus| name|seniorCitizen|sex|
+---+-------------+---------------------------+---+
| 35| M| Mrs. Joanna| N| F|
| 25| S| Ms. Isabelle| N| F|
| 19| S| Mr. Andy| N| M|
| 70| M| Mr. Robert| Y| M|
+---+-------------+--------+------------------+---+
Spark 提供的大多数转换都是静态的,而不是动态的。例如,如示例 here and here.
中所定义
我正在使用 Spark 数据集,因为我正在从关系数据源加载,但如果您可以建议使用普通 RDD 执行此操作的更好方法,请执行。
您可以使用 withColumn
添加新列,对于 seniorCitizen
使用 where
子句和更新 name
您可以使用用户定义的函数 (udf)
如下
import spark.implicits._
import org.apache.spark.sql.functions._
//create a dummy data
val df = Seq((35, "M", "Joanna", "F"),
(25, "S", "Isabelle", "F"),
(19, "S", "Andy", "M"),
(70, "M", "Robert", "M")
).toDF("age", "maritalStatus", "name", "sex")
// create a udf to update name according to age and sex
val append = udf((name: String, maritalStatus:String, sex: String) => {
if (sex.equalsIgnoreCase("F") && maritalStatus.equalsIgnoreCase("M")) s"Mrs. ${name}"
else if (sex.equalsIgnoreCase("F")) s"Ms. ${name}"
else s"Mr. ${name}"
})
//add two new columns with using withColumn
df.withColumn("name", append($"name", $"maritalStatus", $"sex"))
.withColumn("seniorCitizen", when($"age" < 60, "N").otherwise("Y")).show
输出:
+---+-------------+------------+---+-------------+
|age|maritalStatus| name|sex|seniorCitizen|
+---+-------------+------------+---+-------------+
| 35| M| Mrs. Joanna| F| N|
| 25| S|Ms. Isabelle| F| N|
| 19| S| Mr. Andy| M| N|
| 70| M| Mr. Robert| M| Y|
+---+-------------+------------+---+-------------+
编辑:
这是不使用 UDF 的输出
df.withColumn("name",
when($"sex" === "F", when($"maritalStatus" === "M", concat(lit("Ms. "), df("name"))).otherwise(concat(lit("Ms. "), df("name"))))
.otherwise(concat(lit("Ms. "), df("name"))))
.withColumn("seniorCitizen", when($"age" < 60, "N").otherwise("Y"))
希望对您有所帮助!
Spark functions 可以帮助您完成工作。您可以组合 when
、concat
、lit
函数,如下所述
val updateName = when(lower($"maritalStatus") === "m" && lower($"sex") === "f", concat(lit("Mrs. "), $"name"))
.otherwise(when(lower($"maritalStatus") === "s" && lower($"sex") === "f", concat(lit("Ms. "), $"name"))
.otherwise(when(lower($"sex") === "m", concat(lit("Mr. "), $"name"))))
val updatedDataSet = dataset.withColumn("name", updateName)
.withColumn("seniorCitizen", when($"age" > 60, "Y").otherwise("N"))
updatedDataSet
是您需要的 dataset
我正在尝试使用 Spark DataSet 来加载相当大的(比方说)人物数据,其中的子集数据如下所示。
|age|maritalStatus| name|sex|
+---+-------------+--------+---+
| 35| M| Joanna| F|
| 25| S|Isabelle| F|
| 19| S| Andy| M|
| 70| M| Robert| M|
+---+-------------+--------+---+
我需要进行关系转换,其中一列从其他列派生其值。 例如,根据每个人记录的 "age" & "sex" 我需要在每个 "name" 属性前面加上 Mr 或 Ms/Mrs。另一个例子,对于一个 "age" 超过 60 岁的人,我需要将他或她标记为老年人(派生列 "seniorCitizen" 为 Y)。
我最终需要转换后的数据如下:
+---+-------------+---------------------------+---+
|age|maritalStatus| name|seniorCitizen|sex|
+---+-------------+---------------------------+---+
| 35| M| Mrs. Joanna| N| F|
| 25| S| Ms. Isabelle| N| F|
| 19| S| Mr. Andy| N| M|
| 70| M| Mr. Robert| Y| M|
+---+-------------+--------+------------------+---+
Spark 提供的大多数转换都是静态的,而不是动态的。例如,如示例 here and here.
中所定义我正在使用 Spark 数据集,因为我正在从关系数据源加载,但如果您可以建议使用普通 RDD 执行此操作的更好方法,请执行。
您可以使用 withColumn
添加新列,对于 seniorCitizen
使用 where
子句和更新 name
您可以使用用户定义的函数 (udf)
如下
import spark.implicits._
import org.apache.spark.sql.functions._
//create a dummy data
val df = Seq((35, "M", "Joanna", "F"),
(25, "S", "Isabelle", "F"),
(19, "S", "Andy", "M"),
(70, "M", "Robert", "M")
).toDF("age", "maritalStatus", "name", "sex")
// create a udf to update name according to age and sex
val append = udf((name: String, maritalStatus:String, sex: String) => {
if (sex.equalsIgnoreCase("F") && maritalStatus.equalsIgnoreCase("M")) s"Mrs. ${name}"
else if (sex.equalsIgnoreCase("F")) s"Ms. ${name}"
else s"Mr. ${name}"
})
//add two new columns with using withColumn
df.withColumn("name", append($"name", $"maritalStatus", $"sex"))
.withColumn("seniorCitizen", when($"age" < 60, "N").otherwise("Y")).show
输出:
+---+-------------+------------+---+-------------+
|age|maritalStatus| name|sex|seniorCitizen|
+---+-------------+------------+---+-------------+
| 35| M| Mrs. Joanna| F| N|
| 25| S|Ms. Isabelle| F| N|
| 19| S| Mr. Andy| M| N|
| 70| M| Mr. Robert| M| Y|
+---+-------------+------------+---+-------------+
编辑:
这是不使用 UDF 的输出
df.withColumn("name",
when($"sex" === "F", when($"maritalStatus" === "M", concat(lit("Ms. "), df("name"))).otherwise(concat(lit("Ms. "), df("name"))))
.otherwise(concat(lit("Ms. "), df("name"))))
.withColumn("seniorCitizen", when($"age" < 60, "N").otherwise("Y"))
希望对您有所帮助!
Spark functions 可以帮助您完成工作。您可以组合 when
、concat
、lit
函数,如下所述
val updateName = when(lower($"maritalStatus") === "m" && lower($"sex") === "f", concat(lit("Mrs. "), $"name"))
.otherwise(when(lower($"maritalStatus") === "s" && lower($"sex") === "f", concat(lit("Ms. "), $"name"))
.otherwise(when(lower($"sex") === "m", concat(lit("Mr. "), $"name"))))
val updatedDataSet = dataset.withColumn("name", updateName)
.withColumn("seniorCitizen", when($"age" > 60, "Y").otherwise("N"))
updatedDataSet
是您需要的 dataset