如何根据来自其他数据集的值有效地映射来自一个数据集的键
How do I efficiently map keys from one dataset based on values from other dataset
假设数据框 1 代表目标国家和源国家列表,数据框 2 代表所有国家的可用性,从数据框 1 中找到目标国家映射为 TRUE 并且源国家/地区映射是 FALSE:
数据帧 1(targetId、sourceId):
美国:中国、俄罗斯、印度、日本
中国:美国、俄罗斯、印度
俄罗斯:美国、日本
数据框 2(id,可用):
美国:真实
中国:false
俄罗斯:真实
印度:错误
日本:正确
结果数据集应如下所示:
(美国,中国),
(美国、印度)
我的想法是首先分解数据集 1,创建新数据框(比如 tempDF),向其中添加 2 个新列:targetAvailable、sourceAvailable,最后过滤 targetAvailable = false 和 sourceAvailable = true 以获得所需的结果数据框。
下面是我的代码片段:
val sourceDF = sourceData.toDF("targetId", "sourceId")
val mappingDF = mappingData.toDF("id", "available")
val tempDF = sourceDF.select(col("targetId"),
explode(col("sourceId")).as("source_id_split"))
val resultDF = tempDF.select("targetId")
.withColumn("targetAvailable", isAvailable(tempDF.col("targetId")))
.withColumn("sourceAvailable", isAvailable(tempDF.col("source_id_split")))
/*resultDF.select("targetId", "sourceId").
filter(col("targetAvailable") === "true" and col("sourceAvailable")
=== "false").show()*/
// udf to find the availability value for the given id from the mapping table
val isAvailable = udf((searchId: String) => {
val rows = mappingDF.select("available")
.filter(col("id") === searchId).collect()
if (rows(0)(0).toString.equals("true")) "true" else "false" })
在计算 resultDF
时调用 isAvailable
UDF 会抛出一些奇怪的异常。难道我做错了什么?有更好/更简单的方法吗?
在您的 UDF 中,您正在引用另一个数据框,这是不可能的,因此您获得了 "weird" 异常。
您想根据另一个数据框中包含的值过滤一个数据框。您需要做的是连接 id
列。在您的案例中实际上有两个连接,一个用于目标,一个用于源。
但是使用 explode
的想法非常好。这是一种实现您想要的方法:
// generating data, please provide this code next time ;-)
val sourceDF = Seq("USA" -> Seq("China", "Russia", "India", "Japan"),
"China" -> Seq("USA", "Russia", "India"),
"Russia" -> Seq("USA", "Japan"))
.toDF("targetId", "sourceId")
val mappingDF = Seq("USA" -> true, "China" -> false,
"Russia" -> true, "India" -> false,
"Japan" -> true)
.toDF("id", "available")
sourceDF
// we can filter available targets before exploding.
// let's do it to be more efficient.
.join(mappingDF.withColumnRenamed("id", "targetId"), Seq("targetId"))
.where('available)
// exploding the sources
.select('targetId, explode('sourceId) as "sourceId")
// then we keep only non available sources
.join(mappingDF.withColumnRenamed("id", "sourceId"), Seq("sourceId"))
.where(! 'available)
.select("targetId", "sourceId")
.show(false)
产生
+--------+--------+
|targetId|sourceId|
+--------+--------+
|USA |China |
|USA |India |
+--------+--------+
假设数据框 1 代表目标国家和源国家列表,数据框 2 代表所有国家的可用性,从数据框 1 中找到目标国家映射为 TRUE 并且源国家/地区映射是 FALSE:
数据帧 1(targetId、sourceId):
美国:中国、俄罗斯、印度、日本
中国:美国、俄罗斯、印度
俄罗斯:美国、日本
数据框 2(id,可用):
美国:真实
中国:false
俄罗斯:真实
印度:错误
日本:正确
结果数据集应如下所示:
(美国,中国),
(美国、印度)
我的想法是首先分解数据集 1,创建新数据框(比如 tempDF),向其中添加 2 个新列:targetAvailable、sourceAvailable,最后过滤 targetAvailable = false 和 sourceAvailable = true 以获得所需的结果数据框。
下面是我的代码片段:
val sourceDF = sourceData.toDF("targetId", "sourceId")
val mappingDF = mappingData.toDF("id", "available")
val tempDF = sourceDF.select(col("targetId"),
explode(col("sourceId")).as("source_id_split"))
val resultDF = tempDF.select("targetId")
.withColumn("targetAvailable", isAvailable(tempDF.col("targetId")))
.withColumn("sourceAvailable", isAvailable(tempDF.col("source_id_split")))
/*resultDF.select("targetId", "sourceId").
filter(col("targetAvailable") === "true" and col("sourceAvailable")
=== "false").show()*/
// udf to find the availability value for the given id from the mapping table
val isAvailable = udf((searchId: String) => {
val rows = mappingDF.select("available")
.filter(col("id") === searchId).collect()
if (rows(0)(0).toString.equals("true")) "true" else "false" })
在计算 resultDF
时调用 isAvailable
UDF 会抛出一些奇怪的异常。难道我做错了什么?有更好/更简单的方法吗?
在您的 UDF 中,您正在引用另一个数据框,这是不可能的,因此您获得了 "weird" 异常。
您想根据另一个数据框中包含的值过滤一个数据框。您需要做的是连接 id
列。在您的案例中实际上有两个连接,一个用于目标,一个用于源。
但是使用 explode
的想法非常好。这是一种实现您想要的方法:
// generating data, please provide this code next time ;-)
val sourceDF = Seq("USA" -> Seq("China", "Russia", "India", "Japan"),
"China" -> Seq("USA", "Russia", "India"),
"Russia" -> Seq("USA", "Japan"))
.toDF("targetId", "sourceId")
val mappingDF = Seq("USA" -> true, "China" -> false,
"Russia" -> true, "India" -> false,
"Japan" -> true)
.toDF("id", "available")
sourceDF
// we can filter available targets before exploding.
// let's do it to be more efficient.
.join(mappingDF.withColumnRenamed("id", "targetId"), Seq("targetId"))
.where('available)
// exploding the sources
.select('targetId, explode('sourceId) as "sourceId")
// then we keep only non available sources
.join(mappingDF.withColumnRenamed("id", "sourceId"), Seq("sourceId"))
.where(! 'available)
.select("targetId", "sourceId")
.show(false)
产生
+--------+--------+
|targetId|sourceId|
+--------+--------+
|USA |China |
|USA |India |
+--------+--------+