从 Spark Dataframe 构建 2D 查找 table
build a 2D lookup table from Spark Dataframe
我想将一个较小的数据帧转换为广播查找 table,以便在另一个较大数据帧的 UDF 中使用。这个较小的数据框 (myLookupDf) 可能如下所示:
+---+---+---+---+
| x | 90|100|101|
+---+---+---+---+
| 90| 1| 0| 0|
|100| 0| 1| 1|
|101| 0| 1| 1|
+---+---+---+---+
我想使用第一列作为第一个键,比如 x1,第一行作为第二个键。 x1 和 x2 具有相同的元素。理想情况下,查找 table (myLookupMap) 将是一个 Scala Map(或类似的)并且工作方式如下:
myLookupMap(90)(90) returns 1
myLookupMap(90)(101) returns 0
myLookupMap(100)(90) returns 0
myLookupMap(101)(100) return 1
etc.
到目前为止,我设法拥有:
val myLookupMap = myLookupDf.collect().map(r => Map(myLookupDf.columns.zip(r.toSeq):_*))
myLookupMap: Array[scala.collection.Map[String,Any]] = Array(Map(x -> 90, 90 -> 1, 100 -> 0, 101 -> 0), Map(x -> 100, 90 -> 0, 100 -> 1, 101 -> 1), Map(x -> 101, 90 -> 0, 100 -> 1, 101 -> 1))
这是一个地图数组,并不完全是必需的。非常感谢任何建议。
collect()
总是创建等同于 Array
的 rdd
。您必须想方设法将 arrays
收集为 maps
.
给定 dataframe
作为
scala> myLookupDf.show(false)
+---+---+---+---+
|x |90 |100|101|
+---+---+---+---+
|90 |1 |0 |0 |
|100|0 |1 |1 |
|101|0 |1 |1 |
+---+---+---+---+
您只需要 x
以外的 header 个名称,这样您就可以执行如下操作
scala> val header = myLookupDf.schema.fieldNames.tail
header: Array[String] = Array(90, 100, 101)
我只是修改你的 map
函数以获得 Map
作为结果
scala> val myLookupMap = myLookupDf.rdd.map(r => {
| val row = r.toSeq
| (row.head, Map(header.zip(row.tail):_*))
| }).collectAsMap()
myLookupMap: scala.collection.Map[Any,scala.collection.immutable.Map[String,Any]] = Map(101 -> Map(90 -> 0, 100 -> 1, 101 -> 1), 100 -> Map(90 -> 0, 100 -> 1, 101 -> 1), 90 -> Map(90 -> 1, 100 -> 0, 101 -> 0))
您应该会看到您得到了想要的结果。
scala> myLookupMap(90)(90.toString)
res1: Any = 1
scala> myLookupMap(90)(101.toString)
res2: Any = 0
scala> myLookupMap(100)(90.toString)
res3: Any = 0
scala> myLookupMap(101)(100.toString)
res4: Any = 1
现在您可以将 myLookupMap
传递给您的 udf
函数
我想将一个较小的数据帧转换为广播查找 table,以便在另一个较大数据帧的 UDF 中使用。这个较小的数据框 (myLookupDf) 可能如下所示:
+---+---+---+---+
| x | 90|100|101|
+---+---+---+---+
| 90| 1| 0| 0|
|100| 0| 1| 1|
|101| 0| 1| 1|
+---+---+---+---+
我想使用第一列作为第一个键,比如 x1,第一行作为第二个键。 x1 和 x2 具有相同的元素。理想情况下,查找 table (myLookupMap) 将是一个 Scala Map(或类似的)并且工作方式如下:
myLookupMap(90)(90) returns 1
myLookupMap(90)(101) returns 0
myLookupMap(100)(90) returns 0
myLookupMap(101)(100) return 1
etc.
到目前为止,我设法拥有:
val myLookupMap = myLookupDf.collect().map(r => Map(myLookupDf.columns.zip(r.toSeq):_*))
myLookupMap: Array[scala.collection.Map[String,Any]] = Array(Map(x -> 90, 90 -> 1, 100 -> 0, 101 -> 0), Map(x -> 100, 90 -> 0, 100 -> 1, 101 -> 1), Map(x -> 101, 90 -> 0, 100 -> 1, 101 -> 1))
这是一个地图数组,并不完全是必需的。非常感谢任何建议。
collect()
总是创建等同于 Array
的 rdd
。您必须想方设法将 arrays
收集为 maps
.
给定 dataframe
作为
scala> myLookupDf.show(false)
+---+---+---+---+
|x |90 |100|101|
+---+---+---+---+
|90 |1 |0 |0 |
|100|0 |1 |1 |
|101|0 |1 |1 |
+---+---+---+---+
您只需要 x
以外的 header 个名称,这样您就可以执行如下操作
scala> val header = myLookupDf.schema.fieldNames.tail
header: Array[String] = Array(90, 100, 101)
我只是修改你的 map
函数以获得 Map
作为结果
scala> val myLookupMap = myLookupDf.rdd.map(r => {
| val row = r.toSeq
| (row.head, Map(header.zip(row.tail):_*))
| }).collectAsMap()
myLookupMap: scala.collection.Map[Any,scala.collection.immutable.Map[String,Any]] = Map(101 -> Map(90 -> 0, 100 -> 1, 101 -> 1), 100 -> Map(90 -> 0, 100 -> 1, 101 -> 1), 90 -> Map(90 -> 1, 100 -> 0, 101 -> 0))
您应该会看到您得到了想要的结果。
scala> myLookupMap(90)(90.toString)
res1: Any = 1
scala> myLookupMap(90)(101.toString)
res2: Any = 0
scala> myLookupMap(100)(90.toString)
res3: Any = 0
scala> myLookupMap(101)(100.toString)
res4: Any = 1
现在您可以将 myLookupMap
传递给您的 udf
函数