Spark 连接数据框和数据集
Spark join dataframes & datasets
我有一个名为 Link 的 DataFrame
,在 Row
中动态数量为 fields/columns。
但是,某些字段的结构 [ClassName]Id 包含一个 id
[ClassName]Id 的类型总是 String
我有几个 Datasets
每个不同的类型 [ClassName]
每个 Dataset
至少有字段 id
(String
) 和 typeName
(String
),它总是填充 [=19= 的 String 值]
例如如果我有 3 个 DataSets
类型 A、B 和 C
Link:
+----+-----+-----+-----+
| id | AId | BId | CId |
+----+-----+-----+-----+
| XX | A01 | B02 | C04 |
| XY | null| B05 | C07 |
A:
+-----+----------+-----+-----+
| id | typeName | ... | ... |
+-----+----------+-----+-----+
| A01 | A | ... | ... |
B:
+-----+----------+-----+-----+
| id | typeName | ... | ... |
+-----+----------+-----+-----+
| B02 | B | ... | ... |
首选的最终结果将是 Link Dataframe
,其中每个 ID 都被替换或附加一个名为 [ClassName]
的字段封装的原始对象。
结果:
+----+----------------+----------------+----------------+
| id | A | B | C |
+----+----------------+----------------+----------------+
| XX | A(A01, A, ...) | B(B02, B, ...) | C(C04, C, ...) |
| XY | null | B(B05, B, ...) | C(C07, C, ...) |
我试过的东西
- 对 joinWith 的递归调用。
第一次调用成功返回元组/
Row
,其中第一个元素是原始 Row
,第二个元素是匹配的 [ClassName]
然而,第二次迭代开始嵌套这些结果。
尝试使用 map 'unnest' 这些结果要么导致编码器地狱(因为生成的 Row
不是固定类型),要么编码太复杂以至于导致催化剂 error
- 加入 RDD 还不能解决这个问题。
欢迎提出任何想法。
所以我想出了如何做我想做的事。
我做了一些改变让它为我工作,但它是
出于参考目的,我将展示我的步骤,也许它对将来的人有用?
- 首先我声明一个数据类型,它共享我感兴趣的 A、B、C 等的所有属性,并使 classes 扩展自这个超级类型
case class Base(id: String, typeName: String)
case class A(override val id: String, override val typeName: String) extends Base(id, typeName)
- 接下来我加载 link
Dataframe
val linkDataFrame = spark.read.parquet("[path]")
- 我想将此
DataFrame
转换为可连接的内容,这意味着为连接的源创建一个占位符以及一种将所有单个 Id
字段(AId、BId 等)转换为a Map
of source -> id's。 Spark 有一个有用的 sql map
方法。我们还需要将 Base
class 转换为 StructType
以便在编码器中使用。尝试了多种方法,但无法绕过特定声明(否则会出现投射错误)
val linkDataFrame = spark.read.parquet("[path]")
case class LinkReformatted(ids: Map[String, Long], sources: Map[String, Base])
// Maps each column ending with Id into a Map of (columnname1 (-Id), value1, columnname2 (-Id), value2)
val mapper = linkDataFrame.columns.toList
.filter(
_.matches("(?i).*Id$")
)
.flatMap(
c => List(lit(c.replaceAll("(?i)Id$", "")), col(c))
)
val baseStructType = ScalaReflection.schemaFor[Base].dataType.asInstanceOf[StructType]
- 所有这些部分使得创建一个新的
DataFrame
成为可能,其中 ID 都在一个名为 ids 的字段中,并且 sources 的占位符 在空 Map[String, Base]
val linkDatasetReformatted = linkDataFrame.select(
map(mapper: _*).alias("ids")
)
.withColumn("sources", lit(null).cast(MapType(StringType, baseStructType)))
.as[LinkReformatted]
- 下一步是将所有源
Datasets
(A、B 等)加入这个重新格式化的 Link 数据集。这个尾递归方法发生了很多事情
@tailrec
def recursiveJoinBases(sourceDataset: Dataset[LinkReformatted], datasets: List[Dataset[Base]]): Dataset[LinkReformatted] = datasets match {
case Nil => sourceDataset // Nothing left to join, return it
case baseDataset :: remainingDatasets => {
val typeName = baseDataset.head.typeName // extract the type from base (each field hase same value)
val masterName = "source" // something to name the source
val joinedDataset = sourceDataset.as(masterName) // joining source
.joinWith(
baseDataset.as(typeName), // with a base A,B, etc
col(s"$typeName.id") === col(s"$masterName.ids.$typeName"), // join on source.ids.[typeName]
"left_outer"
)
.map {
case (source, base) => {
val newSources = if (source.sources == null) Map(typeName -> base) else source.sources + (typeName -> base) // append or create map of sources
source.copy(sources = newSources)
}
}
.as[LinkReformatted]
recursiveJoinBases(joinedDataset, remainingDatasets)
}
}
- 您现在得到
Dataset
条 LinkReformatted
条记录,其中 ids 字段中的每个对应 typeName -> id
对应 typeName -> Base
在 来源 字段中。
对我来说这就足够了。我可以在这个最终数据集上使用一些地图函数提取我需要的一切
希望对您有所帮助。我知道这不是我要问的确切解决方案,也不是很简单。
我有一个名为 Link 的 DataFrame
,在 Row
中动态数量为 fields/columns。
但是,某些字段的结构 [ClassName]Id 包含一个 id
[ClassName]Id 的类型总是 String
我有几个 Datasets
每个不同的类型 [ClassName]
每个 Dataset
至少有字段 id
(String
) 和 typeName
(String
),它总是填充 [=19= 的 String 值]
例如如果我有 3 个 DataSets
类型 A、B 和 C
Link:
+----+-----+-----+-----+
| id | AId | BId | CId |
+----+-----+-----+-----+
| XX | A01 | B02 | C04 |
| XY | null| B05 | C07 |
A:
+-----+----------+-----+-----+
| id | typeName | ... | ... |
+-----+----------+-----+-----+
| A01 | A | ... | ... |
B:
+-----+----------+-----+-----+
| id | typeName | ... | ... |
+-----+----------+-----+-----+
| B02 | B | ... | ... |
首选的最终结果将是 Link Dataframe
,其中每个 ID 都被替换或附加一个名为 [ClassName]
的字段封装的原始对象。
结果:
+----+----------------+----------------+----------------+
| id | A | B | C |
+----+----------------+----------------+----------------+
| XX | A(A01, A, ...) | B(B02, B, ...) | C(C04, C, ...) |
| XY | null | B(B05, B, ...) | C(C07, C, ...) |
我试过的东西
- 对 joinWith 的递归调用。
第一次调用成功返回元组/
Row
,其中第一个元素是原始Row
,第二个元素是匹配的[ClassName]
然而,第二次迭代开始嵌套这些结果。 尝试使用 map 'unnest' 这些结果要么导致编码器地狱(因为生成的Row
不是固定类型),要么编码太复杂以至于导致催化剂 error - 加入 RDD 还不能解决这个问题。
欢迎提出任何想法。
所以我想出了如何做我想做的事。 我做了一些改变让它为我工作,但它是 出于参考目的,我将展示我的步骤,也许它对将来的人有用?
- 首先我声明一个数据类型,它共享我感兴趣的 A、B、C 等的所有属性,并使 classes 扩展自这个超级类型
case class Base(id: String, typeName: String)
case class A(override val id: String, override val typeName: String) extends Base(id, typeName)
- 接下来我加载 link
Dataframe
val linkDataFrame = spark.read.parquet("[path]")
- 我想将此
DataFrame
转换为可连接的内容,这意味着为连接的源创建一个占位符以及一种将所有单个Id
字段(AId、BId 等)转换为aMap
of source -> id's。 Spark 有一个有用的 sqlmap
方法。我们还需要将Base
class 转换为StructType
以便在编码器中使用。尝试了多种方法,但无法绕过特定声明(否则会出现投射错误)
val linkDataFrame = spark.read.parquet("[path]")
case class LinkReformatted(ids: Map[String, Long], sources: Map[String, Base])
// Maps each column ending with Id into a Map of (columnname1 (-Id), value1, columnname2 (-Id), value2)
val mapper = linkDataFrame.columns.toList
.filter(
_.matches("(?i).*Id$")
)
.flatMap(
c => List(lit(c.replaceAll("(?i)Id$", "")), col(c))
)
val baseStructType = ScalaReflection.schemaFor[Base].dataType.asInstanceOf[StructType]
- 所有这些部分使得创建一个新的
DataFrame
成为可能,其中 ID 都在一个名为 ids 的字段中,并且 sources 的占位符 在空Map[String, Base]
val linkDatasetReformatted = linkDataFrame.select(
map(mapper: _*).alias("ids")
)
.withColumn("sources", lit(null).cast(MapType(StringType, baseStructType)))
.as[LinkReformatted]
- 下一步是将所有源
Datasets
(A、B 等)加入这个重新格式化的 Link 数据集。这个尾递归方法发生了很多事情
@tailrec
def recursiveJoinBases(sourceDataset: Dataset[LinkReformatted], datasets: List[Dataset[Base]]): Dataset[LinkReformatted] = datasets match {
case Nil => sourceDataset // Nothing left to join, return it
case baseDataset :: remainingDatasets => {
val typeName = baseDataset.head.typeName // extract the type from base (each field hase same value)
val masterName = "source" // something to name the source
val joinedDataset = sourceDataset.as(masterName) // joining source
.joinWith(
baseDataset.as(typeName), // with a base A,B, etc
col(s"$typeName.id") === col(s"$masterName.ids.$typeName"), // join on source.ids.[typeName]
"left_outer"
)
.map {
case (source, base) => {
val newSources = if (source.sources == null) Map(typeName -> base) else source.sources + (typeName -> base) // append or create map of sources
source.copy(sources = newSources)
}
}
.as[LinkReformatted]
recursiveJoinBases(joinedDataset, remainingDatasets)
}
}
- 您现在得到
Dataset
条LinkReformatted
条记录,其中 ids 字段中的每个对应typeName -> id
对应typeName -> Base
在 来源 字段中。 对我来说这就足够了。我可以在这个最终数据集上使用一些地图函数提取我需要的一切
希望对您有所帮助。我知道这不是我要问的确切解决方案,也不是很简单。