从 Spark Dataframe 构建 2D 查找 table

build a 2D lookup table from Spark Dataframe

我想将一个较小的数据帧转换为广播查找 table,以便在另一个较大数据帧的 UDF 中使用。这个较小的数据框 (myLookupDf) 可能如下所示:

+---+---+---+---+
| x | 90|100|101|
+---+---+---+---+
| 90|  1|  0|  0|
|100|  0|  1|  1|
|101|  0|  1|  1|
+---+---+---+---+

我想使用第一列作为第一个键,比如 x1,第一行作为第二个键。 x1 和 x2 具有相同的元素。理想情况下,查找 table (myLookupMap) 将是一个 Scala Map(或类似的)并且工作方式如下:

myLookupMap(90)(90) returns 1
myLookupMap(90)(101) returns 0
myLookupMap(100)(90) returns 0
myLookupMap(101)(100) return 1
etc.

到目前为止,我设法拥有:

val myLookupMap = myLookupDf.collect().map(r => Map(myLookupDf.columns.zip(r.toSeq):_*))
myLookupMap: Array[scala.collection.Map[String,Any]] = Array(Map(x -> 90, 90 -> 1, 100 -> 0, 101 -> 0), Map(x -> 100, 90 -> 0, 100 -> 1, 101 -> 1), Map(x -> 101, 90 -> 0, 100 -> 1, 101 -> 1))

这是一个地图数组,并不完全是必需的。非常感谢任何建议。

collect() 总是创建等同于 Arrayrdd。您必须想方设法将 arrays 收集为 maps.

给定 dataframe 作为

scala> myLookupDf.show(false)
+---+---+---+---+
|x  |90 |100|101|
+---+---+---+---+
|90 |1  |0  |0  |
|100|0  |1  |1  |
|101|0  |1  |1  |
+---+---+---+---+

您只需要 x 以外的 header 个名称,这样您就可以执行如下操作

scala>     val header = myLookupDf.schema.fieldNames.tail
header: Array[String] = Array(90, 100, 101)

我只是修改你的 map 函数以获得 Map 作为结果

scala>     val myLookupMap = myLookupDf.rdd.map(r => {
     |       val row = r.toSeq
     |       (row.head, Map(header.zip(row.tail):_*))
     |     }).collectAsMap()
myLookupMap: scala.collection.Map[Any,scala.collection.immutable.Map[String,Any]] = Map(101 -> Map(90 -> 0, 100 -> 1, 101 -> 1), 100 -> Map(90 -> 0, 100 -> 1, 101 -> 1), 90 -> Map(90 -> 1, 100 -> 0, 101 -> 0))

您应该会看到您得到了想要的结果。

scala> myLookupMap(90)(90.toString)
res1: Any = 1

scala> myLookupMap(90)(101.toString)
res2: Any = 0

scala> myLookupMap(100)(90.toString)
res3: Any = 0

scala> myLookupMap(101)(100.toString)
res4: Any = 1

现在您可以将 myLookupMap 传递给您的 udf 函数