如何根据来自其他数据集的值有效地映射来自一个数据集的键

How do I efficiently map keys from one dataset based on values from other dataset

假设数据框 1 代表目标国家和源国家列表,数据框 2 代表所有国家的可用性,从数据框 1 中找到目标国家映射为 TRUE 并且源国家/地区映射是 FALSE:

数据帧 1(targetId、sourceId):
美国:中国、俄罗斯、印度、日本
中国:美国、俄罗斯、印度
俄罗斯:美国、日本

数据框 2(id,可用):
美国:真实
中国:false
俄罗斯:真实
印度:错误
日本:正确

结果数据集应如下所示:
(美国,中国),
(美国、印度)

我的想法是首先分解数据集 1,创建新数据框(比如 tempDF),向其中添加 2 个新列:targetAvailable、sourceAvailable,最后过滤 targetAvailable = false 和 sourceAvailable = true 以获得所需的结果数据框。

下面是我的代码片段:

 val sourceDF = sourceData.toDF("targetId", "sourceId")
 val mappingDF = mappingData.toDF("id", "available")
 val tempDF = sourceDF.select(col("targetId"), 
                explode(col("sourceId")).as("source_id_split"))

 val resultDF = tempDF.select("targetId")
         .withColumn("targetAvailable", isAvailable(tempDF.col("targetId")))
         .withColumn("sourceAvailable", isAvailable(tempDF.col("source_id_split")))


 /*resultDF.select("targetId", "sourceId").
  filter(col("targetAvailable") === "true" and col("sourceAvailable") 
  === "false").show()*/


// udf to find the availability value for the given id from the mapping table
val isAvailable = udf((searchId: String) => {
val rows = mappingDF.select("available")
          .filter(col("id") === searchId).collect()

if (rows(0)(0).toString.equals("true")) "true" else "false"  })

在计算 resultDF 时调用 isAvailable UDF 会抛出一些奇怪的异常。难道我做错了什么?有更好/更简单的方法吗?

在您的 UDF 中,您正在引用另一个数据框,这是不可能的,因此您获得了 "weird" 异常。

您想根据另一个数据框中包含的值过滤一个数据框。您需要做的是连接 id 列。在您的案例中实际上有两个连接,一个用于目标,一个用于源。

但是使用 explode 的想法非常好。这是一种实现您想要的方法:

// generating data, please provide this code next time ;-)
val sourceDF = Seq("USA" ->  Seq("China", "Russia", "India", "Japan"),
                   "China" -> Seq("USA", "Russia", "India"),
                   "Russia" -> Seq("USA", "Japan"))
               .toDF("targetId", "sourceId")
val mappingDF = Seq("USA" -> true, "China" -> false,
                    "Russia" -> true, "India" -> false,
                    "Japan" -> true)
               .toDF("id", "available")

sourceDF
    // we can filter available targets before exploding.
    // let's do it to be more efficient.
    .join(mappingDF.withColumnRenamed("id", "targetId"), Seq("targetId"))
    .where('available)
    // exploding the sources
    .select('targetId, explode('sourceId) as "sourceId")
    // then we keep only non available sources
    .join(mappingDF.withColumnRenamed("id", "sourceId"), Seq("sourceId"))
    .where(! 'available)
    .select("targetId", "sourceId")
    .show(false)

产生

+--------+--------+
|targetId|sourceId|
+--------+--------+
|USA     |China   |
|USA     |India   |
+--------+--------+