scala- 数据集- 如何在两种情况 类 上使用一个函数?

scala- dataset- how to use a function on two case classes?

我有两个数据集并创建了两个案例类(我不将ds连接在一起的原因是因为我想return ds B中的第一个匹配记录与ds中的键一)

以下是我目前的尝试。如何将该函数映射到两种情况 类?非常感谢!我在这里很新。

case class a_class (idA: Int, numA: Int)
case class b_class(idB: Int, numB:Int )
def findNum(a:a_Class, b:b_class): Int = {
     if (a.idA =!=b.idB){
            break
       }else{
       return  b.numB
       }
 }
aTb.createOrReplaceTempView("tableA") 
bTb.createOrReplaceTempView("tableB")
var aDS = sqlContext.table("tableA").as[a_class]
var bDS = sqlContext.table("pview").as[b_class]
 //a_class.map(, => )).show //how do I use findNum function here? 

示例输入:

 +------+---+
 |idA  |numA|
 +------+---+
 |    a |100|
 |    b |200|
 +------+---+

 +------+---+
 |idB   |numB|
 +------+---+
 |a     |500|
 |a     |600|
 +------+---+

因此预期输出为 500,因为第一行是 table B

中的第一条匹配记录

您的解决方案在于 joingroupByaggregation

首先,您的 case class 和样本输入数据不匹配,因为 ab 不能是 int 类型。所以 case classes 应该是

case class a_class (idA: String, numA: Int)
case class b_class(idB: String, numB:Int )

使用这些 case classes 您可以创建 dataSets。出于测试目的,我创建如下

import sqlContext.implicits._
import org.apache.spark.sql.functions._

val tableA = Seq(
  a_class("a", 100),
  a_class("b", 200)
).toDS

val tableB = Seq(
  b_class("a", 500),
  b_class("a", 600)
).toDS

那么最后的dataset可以用下面的方法来实现。

tableA.join(tableB, $"idA" === $"idB", "inner") // inner join of two datasets
  .drop("idA", "numA")   //droping columns of tableB
  .groupBy("idB")        //grouping data to get the first of each group
  .agg(first("numB").as("numB"))    //taking the first of each group
  .show(false)

哪个应该给你

+---+----+
|idB|numB|
+---+----+
|a  |600 |
+---+----+

已更新

以上结果与您想要的输出不匹配,这是因为 join 重新排序了 tableB

您只需执行

即可获得所需的输出
tableB.groupBy("idB")
  .agg(first("numB").as("numB"))
  .show(false)

结果将是 tableB

的每个 id 的第一个 row
+---+----+
|idB|numB|
+---+----+
|a  |500 |
+---+----+

如果你只想要第一个 rowsid 匹配 tableA 那么你 jointableA 如上所述,如果你不不想要 tableA 的数据,那么你 drop 他们作为

val tempTableB = tableB.groupBy("idB")
  .agg(first("numB").as("numB"))

tableA.join(tempTableB, $"idA" === $"idB", "inner")
  .drop("idA", "numA")
  .show(false)

输出是 tableB 的第一个 row,与 tableA

id 相匹配
+---+----+
|idB|numB|
+---+----+
|a  |500 |
+---+----+