Spark:如何使用来自不同数据帧的嵌套数据来查找匹配项(Scala)
Spark: How to use nested data from different dataframes to find match (Scala)
我有"doorsDF" var DataFrame,如下(简化):
+-----+-----+-----+------+
| door| x| y| usage|
+-----+-----+-----+------+
| a| 32| 14| 5|
| b| 28| 53| 1|
| c| 65| 94| 23|
| d| 68| 53| 1|
| e| 51| 94| 12|
+-----+-----+-----+------+
我有 "peopleDF" var DataFrame 如下(简化):
+-------+-----+
| person| x|
+-------+-----+
| foo| 30|
| bar| 66|
| Morty| 52|
+-------+-----+
只显示前 3 行
我想为一个人进入建筑物使用什么门做一个基线'prediction'。重要的是一个人是否在5'x'个单位以内。
例如,foo 会穿过门 'a' 和 'b'。莫蒂会穿过门 'c' 和 'd'。酒吧只会穿过门 'e'.
到目前为止的进展:尝试使用以下方法将 "potential exits" 列添加到第二个 DataFrame:
val sequence = doorsDF.select("door").rdd.map(r => r(0)).collect().mkString(",")
val doorColumn = lit(sequence)
peopleDF = peopleDF.withColumn("potentialDoors", doorColumn)
结果(新人DF):
+-------+-----+---------------+
| person| x| potentialDoors|
+-------+-----+---------------+
| foo| 30| a,b,c,d,e|
| bar| 66| a,b,c,d,e|
| Morty| 52| a,b,c,d,e|
+-------+-----+---------------+
但是现在,当我尝试使用 udf 循环遍历潜在的门,使用它们的名称来过滤 doorsDF 并获取其 x 值以与 peopleDF 中的 x 值进行比较时,我得到了一个空指针异常。我读到这是因为我试图在嵌套情况下使用过滤器。
必须有更好的方法来使用可用的函数来执行此操作,甚至可能不求助于 udf 或 rdd,但它让我望而却步。我已经用了很长时间了。
此外,我最初选择一串 potentialDoors(而不是数组)的原因是因为我发现它更容易处理我的目的,但我愿意接受建议。
感谢任何帮助!
我已经通过执行以下操作暂时解决了这个问题:
- 将 doorsDF DataFrame 转换为行数组。让我们说 doorsArr
- 在将 UDF 应用于 peopleDF 以添加 potentialDoor 列时,我遍历了 doorsArr 数组而不是尝试 filter/query 门 DF。
现在我可以成功减少 "potential" 池了。
我不禁觉得这仍然效率低下,需要进行一些硬编码。所以我仍然愿意接受更清洁的建议!我想更好地使用提供的功能。感谢您花时间阅读。
我有"doorsDF" var DataFrame,如下(简化):
+-----+-----+-----+------+
| door| x| y| usage|
+-----+-----+-----+------+
| a| 32| 14| 5|
| b| 28| 53| 1|
| c| 65| 94| 23|
| d| 68| 53| 1|
| e| 51| 94| 12|
+-----+-----+-----+------+
我有 "peopleDF" var DataFrame 如下(简化):
+-------+-----+
| person| x|
+-------+-----+
| foo| 30|
| bar| 66|
| Morty| 52|
+-------+-----+
只显示前 3 行
我想为一个人进入建筑物使用什么门做一个基线'prediction'。重要的是一个人是否在5'x'个单位以内。
例如,foo 会穿过门 'a' 和 'b'。莫蒂会穿过门 'c' 和 'd'。酒吧只会穿过门 'e'.
到目前为止的进展:尝试使用以下方法将 "potential exits" 列添加到第二个 DataFrame:
val sequence = doorsDF.select("door").rdd.map(r => r(0)).collect().mkString(",")
val doorColumn = lit(sequence)
peopleDF = peopleDF.withColumn("potentialDoors", doorColumn)
结果(新人DF):
+-------+-----+---------------+
| person| x| potentialDoors|
+-------+-----+---------------+
| foo| 30| a,b,c,d,e|
| bar| 66| a,b,c,d,e|
| Morty| 52| a,b,c,d,e|
+-------+-----+---------------+
但是现在,当我尝试使用 udf 循环遍历潜在的门,使用它们的名称来过滤 doorsDF 并获取其 x 值以与 peopleDF 中的 x 值进行比较时,我得到了一个空指针异常。我读到这是因为我试图在嵌套情况下使用过滤器。
必须有更好的方法来使用可用的函数来执行此操作,甚至可能不求助于 udf 或 rdd,但它让我望而却步。我已经用了很长时间了。
此外,我最初选择一串 potentialDoors(而不是数组)的原因是因为我发现它更容易处理我的目的,但我愿意接受建议。
感谢任何帮助!
我已经通过执行以下操作暂时解决了这个问题:
- 将 doorsDF DataFrame 转换为行数组。让我们说 doorsArr
- 在将 UDF 应用于 peopleDF 以添加 potentialDoor 列时,我遍历了 doorsArr 数组而不是尝试 filter/query 门 DF。
现在我可以成功减少 "potential" 池了。
我不禁觉得这仍然效率低下,需要进行一些硬编码。所以我仍然愿意接受更清洁的建议!我想更好地使用提供的功能。感谢您花时间阅读。