Spark 连接数据框和数据集

Spark join dataframes & datasets

我有一个名为 LinkDataFrame,在 Row 中动态数量为 fields/columns。 但是,某些字段的结构 [ClassName]Id 包含一个 id

[ClassName]Id 的类型总是 String

我有几个 Datasets 每个不同的类型 [ClassName] 每个 Dataset 至少有字段 id (String) 和 typeName (String),它总是填充 [=19= 的 String 值]

例如如果我有 3 个 DataSets 类型 ABC

Link:

+----+-----+-----+-----+ | id | AId | BId | CId | +----+-----+-----+-----+ | XX | A01 | B02 | C04 | | XY | null| B05 | C07 |

A:

+-----+----------+-----+-----+ | id | typeName | ... | ... | +-----+----------+-----+-----+ | A01 | A | ... | ... |

B:

+-----+----------+-----+-----+ | id | typeName | ... | ... | +-----+----------+-----+-----+ | B02 | B | ... | ... |

首选的最终结果将是 Link Dataframe,其中每个 ID 都被替换或附加一个名为 [ClassName] 的字段封装的原始对象。

结果:

+----+----------------+----------------+----------------+ | id | A | B | C | +----+----------------+----------------+----------------+ | XX | A(A01, A, ...) | B(B02, B, ...) | C(C04, C, ...) | | XY | null | B(B05, B, ...) | C(C07, C, ...) |

我试过的东西

欢迎提出任何想法。

所以我想出了如何做我想做的事。 我做了一些改变让它为我工作,但它是 出于参考目的,我将展示我的步骤,也许它对将来的人有用?

  1. 首先我声明一个数据类型,它共享我感兴趣的 A、B、C 等的所有属性,并使 classes 扩展自这个超级类型
case class Base(id: String, typeName: String)
case class A(override val id: String, override val typeName: String) extends Base(id, typeName)
  1. 接下来我加载 link Dataframe
val linkDataFrame = spark.read.parquet("[path]")
  1. 我想将此 DataFrame 转换为可连接的内容,这意味着为连接的源创建一个占位符以及一种将所有单个 Id 字段(AId、BId 等)转换为a Map of source -> id's。 Spark 有一个有用的 sql map 方法。我们还需要将 Base class 转换为 StructType 以便在编码器中使用。尝试了多种方法,但无法绕过特定声明(否则会出现投射错误)
val linkDataFrame = spark.read.parquet("[path]")

case class LinkReformatted(ids: Map[String, Long], sources: Map[String, Base])

// Maps each column ending with Id into a Map of (columnname1 (-Id), value1, columnname2 (-Id), value2)
val mapper = linkDataFrame.columns.toList
  .filter(
    _.matches("(?i).*Id$")
  )
  .flatMap(
    c => List(lit(c.replaceAll("(?i)Id$", "")), col(c))
)

val baseStructType = ScalaReflection.schemaFor[Base].dataType.asInstanceOf[StructType]
  1. 所有这些部分使得创建一个新的 DataFrame 成为可能,其中 ID 都在一个名为 ids 的字段中,并且 sources 的占位符 在空 Map[String, Base]
val linkDatasetReformatted = linkDataFrame.select(
    map(mapper: _*).alias("ids")
  )
  .withColumn("sources", lit(null).cast(MapType(StringType, baseStructType)))
  .as[LinkReformatted]
  1. 下一步是将所有源 Datasets(A、B 等)加入这个重新格式化的 Link 数据集。这个尾递归方法发生了很多事情
@tailrec
def recursiveJoinBases(sourceDataset: Dataset[LinkReformatted], datasets: List[Dataset[Base]]): Dataset[LinkReformatted] = datasets match {
  case Nil => sourceDataset // Nothing left to join, return it
  case baseDataset :: remainingDatasets => {

    val typeName = baseDataset.head.typeName // extract the type from base (each field hase same value)
    val masterName = "source" // something to name the source

    val joinedDataset = sourceDataset.as(masterName) // joining source
      .joinWith(
      baseDataset.as(typeName), // with a base A,B, etc
      col(s"$typeName.id") === col(s"$masterName.ids.$typeName"), // join on source.ids.[typeName]
      "left_outer"
    )
      .map {
        case (source, base) => {
          val newSources = if (source.sources == null) Map(typeName -> base) else source.sources + (typeName -> base) // append or create map of sources          
          source.copy(sources = newSources)
        }
      }
      .as[LinkReformatted]
    recursiveJoinBases(joinedDataset, remainingDatasets)
  }
}
  1. 您现在得到 DatasetLinkReformatted 条记录,其中 ids 字段中的每个对应 typeName -> id 对应 typeName -> Base 来源 字段中。 对我来说这就足够了。我可以在这个最终数据集上使用一些地图函数提取我需要的一切

希望对您有所帮助。我知道这不是我要问的确切解决方案,也不是很简单。