使用来自另一个数据集的值搜索和更新 Spark 数据集列
Searching and updating a Spark Dataset column with values from another Dataset
Java 8 和 Spark 2.11:2.3.2 在这里。虽然我非常喜欢 Java API 答案,但我确实会说一点 Scala,所以我能够理解其中提供的任何答案!但是 Java 如果可能的话(请)!
我有两个具有不同模式的数据集,例外 一个常见的“model_number
”(字符串)列:两者都存在。
对于我的第一个数据集(我们称之为 d1
)中的每一行,我需要 scan/search 第二个数据集(“d2
”)以查看是否有具有相同 model_number
的行,如果是,则更新另一个 d2
列。
这是我的数据集模式:
d1
===========
model_number : string
desc : string
fizz : string
buzz : date
d2
===========
model_number : string
price : double
source : string
所以,如果 d1
行的 model_number
为 12345,而 d2
行也有相同的 model_number
,我想通过将 d2.price
乘以 10.0
.
来更新 d2.price
迄今为止我最好的尝试:
// I *think* this would give me a 3rd dataset with all d1 and d2 columns, but only
// containing rows from d1 and d2 that have matching 'model_number' values
Dataset<Row> d3 = d1.join(d2, d1.col("model_number") == d2.col("model_number"));
// now I just need to update d2.price based on matching
Dataset<Row> d4 = d3.withColumn("adjusted_price", d3.col("price") * 10.0);
有人能帮我越过终点线吗?在此先感谢!
这里有几点,正如@VamsiPrabhala 在评论中提到的,您需要在特定字段上使用的功能是 join
。关于“update
”,你需要记住spark
中的df
、ds
和rdd
是不可变的,所以你不能update
他们。所以,这里的解决方案是,在 join
你的 df
之后,你需要执行你的计算,在这种情况下乘法,在 select
或使用 withColumn
然后select
。换句话说,您不能更新列,但可以使用“new
”列创建新的 df
。
示例:
Input data:
+------------+------+------+----+
|model_number| desc| fizz|buzz|
+------------+------+------+----+
| model_a|desc_a|fizz_a|null|
| model_b|desc_b|fizz_b|null|
+------------+------+------+----+
+------------+-----+--------+
|model_number|price| source|
+------------+-----+--------+
| model_a| 10.0|source_a|
| model_b| 20.0|source_b|
+------------+-----+--------+
使用join
将输出:
val joinedDF = d1.join(d2, "model_number")
joinedDF.show()
+------------+------+------+----+-----+--------+
|model_number| desc| fizz|buzz|price| source|
+------------+------+------+----+-----+--------+
| model_a|desc_a|fizz_a|null| 10.0|source_a|
| model_b|desc_b|fizz_b|null| 20.0|source_b|
+------------+------+------+----+-----+--------+
应用您的计算:
joinedDF.withColumn("price", col("price") * 10).show()
output:
+------------+------+------+----+-----+--------+
|model_number| desc| fizz|buzz|price| source|
+------------+------+------+----+-----+--------+
| model_a|desc_a|fizz_a|null| 100.0|source_a|
| model_b|desc_b|fizz_b|null| 200.0|source_b|
+------------+------+------+----+-----+--------+
Java 8 和 Spark 2.11:2.3.2 在这里。虽然我非常喜欢 Java API 答案,但我确实会说一点 Scala,所以我能够理解其中提供的任何答案!但是 Java 如果可能的话(请)!
我有两个具有不同模式的数据集,例外 一个常见的“model_number
”(字符串)列:两者都存在。
对于我的第一个数据集(我们称之为 d1
)中的每一行,我需要 scan/search 第二个数据集(“d2
”)以查看是否有具有相同 model_number
的行,如果是,则更新另一个 d2
列。
这是我的数据集模式:
d1
===========
model_number : string
desc : string
fizz : string
buzz : date
d2
===========
model_number : string
price : double
source : string
所以,如果 d1
行的 model_number
为 12345,而 d2
行也有相同的 model_number
,我想通过将 d2.price
乘以 10.0
.
d2.price
迄今为止我最好的尝试:
// I *think* this would give me a 3rd dataset with all d1 and d2 columns, but only
// containing rows from d1 and d2 that have matching 'model_number' values
Dataset<Row> d3 = d1.join(d2, d1.col("model_number") == d2.col("model_number"));
// now I just need to update d2.price based on matching
Dataset<Row> d4 = d3.withColumn("adjusted_price", d3.col("price") * 10.0);
有人能帮我越过终点线吗?在此先感谢!
这里有几点,正如@VamsiPrabhala 在评论中提到的,您需要在特定字段上使用的功能是 join
。关于“update
”,你需要记住spark
中的df
、ds
和rdd
是不可变的,所以你不能update
他们。所以,这里的解决方案是,在 join
你的 df
之后,你需要执行你的计算,在这种情况下乘法,在 select
或使用 withColumn
然后select
。换句话说,您不能更新列,但可以使用“new
”列创建新的 df
。
示例:
Input data:
+------------+------+------+----+
|model_number| desc| fizz|buzz|
+------------+------+------+----+
| model_a|desc_a|fizz_a|null|
| model_b|desc_b|fizz_b|null|
+------------+------+------+----+
+------------+-----+--------+
|model_number|price| source|
+------------+-----+--------+
| model_a| 10.0|source_a|
| model_b| 20.0|source_b|
+------------+-----+--------+
使用join
将输出:
val joinedDF = d1.join(d2, "model_number")
joinedDF.show()
+------------+------+------+----+-----+--------+
|model_number| desc| fizz|buzz|price| source|
+------------+------+------+----+-----+--------+
| model_a|desc_a|fizz_a|null| 10.0|source_a|
| model_b|desc_b|fizz_b|null| 20.0|source_b|
+------------+------+------+----+-----+--------+
应用您的计算:
joinedDF.withColumn("price", col("price") * 10).show()
output:
+------------+------+------+----+-----+--------+
|model_number| desc| fizz|buzz|price| source|
+------------+------+------+----+-----+--------+
| model_a|desc_a|fizz_a|null| 100.0|source_a|
| model_b|desc_b|fizz_b|null| 200.0|source_b|
+------------+------+------+----+-----+--------+