Spark 数据集 API - 加入
Spark Dataset API - join
我正在尝试使用 Spark Dataset API 但我在进行简单连接时遇到了一些问题。
假设我有两个包含字段的数据集:date | value
,那么在 DataFrame
的情况下,我的连接看起来像:
val dfA : DataFrame
val dfB : DataFrame
dfA.join(dfB, dfB("date") === dfA("date") )
但是 Dataset
有 .joinWith
方法,但同样的方法不起作用:
val dfA : Dataset
val dfB : Dataset
dfA.joinWith(dfB, ? )
.joinWith
要求的参数是什么?
要使用 joinWith
,您首先必须创建一个 DataSet
,很可能是两个。要创建 DataSet
,您需要创建一个与您的架构匹配的案例 class 并调用 DataFrame.as[T]
,其中 T
是您的案例 class。所以:
case class KeyValue(key: Int, value: String)
val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value")
val ds = df.as[KeyValue]
// org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string]
您也可以跳过大小写 class 并使用元组:
val tupDs = df.as[(Int,String)]
// org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]
然后如果你有另一个案例class / DF,像这样说:
case class Nums(key: Int, num1: Double, num2: Long)
val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2")
val ds2 = df2.as[Nums]
// org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint]
那么,虽然join
和joinWith
的语法相似,但结果不同:
df.join(df2, df.col("key") === df2.col("key")).show
// +---+-----+---+----+----+
// |key|value|key|num1|num2|
// +---+-----+---+----+----+
// | 1| asdf| 1| 7.7| 101|
// | 2|34234| 2| 1.2| 10|
// +---+-----+---+----+----+
ds.joinWith(ds2, df.col("key") === df2.col("key")).show
// +---------+-----------+
// | _1| _2|
// +---------+-----------+
// | [1,asdf]|[1,7.7,101]|
// |[2,34234]| [2,1.2,10]|
// +---------+-----------+
如您所见,joinWith
将对象完整保留为元组的一部分,而 join
将列扁平化为单个命名空间。 (在上述情况下会出现问题,因为列名 "key" 重复。)
奇怪的是,我必须使用 df.col("key")
和 df2.col("key")
来创建加入 ds
和 ds2
的条件——如果您只使用 col("key")
在任何一方都不起作用,并且 ds.col(...)
不存在。但是,使用原始的 df.col("key")
就可以了。
来自https://docs.cloud.databricks.com/docs/latest/databricks_guide/05%20Spark/1%20Intro%20Datasets.html
看来你可以做到
dfA.as("A").joinWith(dfB.as("B"), $"A.date" === $"B.date" )
对于上面的例子,你可以试试下面的方法:
为您的输出定义一个案例class
case class JoinOutput(key:Int, value:String, num1:Double, num2:Long)
用Seq("key")
连接两个数据集,这将帮助您避免输出中出现两个重复的键列,这也有助于在下一步应用案例class或获取数据
val joined = ds.join(ds2, Seq("key")).as[JoinOutput]
// res27: org.apache.spark.sql.Dataset[JoinOutput] = [key: int, value: string ... 2 more fields]
结果将是平坦的:
joined.show
+---+-----+----+----+
|key|value|num1|num2|
+---+-----+----+----+
| 1| asdf| 7.7| 101|
| 2|34234| 1.2| 10|
+---+-----+----+----+
我正在尝试使用 Spark Dataset API 但我在进行简单连接时遇到了一些问题。
假设我有两个包含字段的数据集:date | value
,那么在 DataFrame
的情况下,我的连接看起来像:
val dfA : DataFrame
val dfB : DataFrame
dfA.join(dfB, dfB("date") === dfA("date") )
但是 Dataset
有 .joinWith
方法,但同样的方法不起作用:
val dfA : Dataset
val dfB : Dataset
dfA.joinWith(dfB, ? )
.joinWith
要求的参数是什么?
要使用 joinWith
,您首先必须创建一个 DataSet
,很可能是两个。要创建 DataSet
,您需要创建一个与您的架构匹配的案例 class 并调用 DataFrame.as[T]
,其中 T
是您的案例 class。所以:
case class KeyValue(key: Int, value: String)
val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value")
val ds = df.as[KeyValue]
// org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string]
您也可以跳过大小写 class 并使用元组:
val tupDs = df.as[(Int,String)]
// org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]
然后如果你有另一个案例class / DF,像这样说:
case class Nums(key: Int, num1: Double, num2: Long)
val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2")
val ds2 = df2.as[Nums]
// org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint]
那么,虽然join
和joinWith
的语法相似,但结果不同:
df.join(df2, df.col("key") === df2.col("key")).show
// +---+-----+---+----+----+
// |key|value|key|num1|num2|
// +---+-----+---+----+----+
// | 1| asdf| 1| 7.7| 101|
// | 2|34234| 2| 1.2| 10|
// +---+-----+---+----+----+
ds.joinWith(ds2, df.col("key") === df2.col("key")).show
// +---------+-----------+
// | _1| _2|
// +---------+-----------+
// | [1,asdf]|[1,7.7,101]|
// |[2,34234]| [2,1.2,10]|
// +---------+-----------+
如您所见,joinWith
将对象完整保留为元组的一部分,而 join
将列扁平化为单个命名空间。 (在上述情况下会出现问题,因为列名 "key" 重复。)
奇怪的是,我必须使用 df.col("key")
和 df2.col("key")
来创建加入 ds
和 ds2
的条件——如果您只使用 col("key")
在任何一方都不起作用,并且 ds.col(...)
不存在。但是,使用原始的 df.col("key")
就可以了。
来自https://docs.cloud.databricks.com/docs/latest/databricks_guide/05%20Spark/1%20Intro%20Datasets.html
看来你可以做到
dfA.as("A").joinWith(dfB.as("B"), $"A.date" === $"B.date" )
对于上面的例子,你可以试试下面的方法:
为您的输出定义一个案例class
case class JoinOutput(key:Int, value:String, num1:Double, num2:Long)
用Seq("key")
连接两个数据集,这将帮助您避免输出中出现两个重复的键列,这也有助于在下一步应用案例class或获取数据
val joined = ds.join(ds2, Seq("key")).as[JoinOutput]
// res27: org.apache.spark.sql.Dataset[JoinOutput] = [key: int, value: string ... 2 more fields]
结果将是平坦的:
joined.show
+---+-----+----+----+
|key|value|num1|num2|
+---+-----+----+----+
| 1| asdf| 7.7| 101|
| 2|34234| 1.2| 10|
+---+-----+----+----+