将 `filter`/`where` 有条件地应用于 Spark `Dataset`/`Dataframe`

Conditional application of `filter`/`where` to a Spark `Dataset`/`Dataframe`

大家好,我有一个函数可以从某些 S3 位置加载数据集和 returns 有趣的数据

private def filterBrowseIndex(spark: SparkSession, s3BrowseIndex: String, mids: Seq[String] = Seq(), indices: Seq[String] = Seq()): Dataset[BrowseIndex] = {
import spark.implicits._

spark
  .sparkContext.textFile(s3BrowseIndex)
  // split text dataset
  .map(line => line.split("\s+"))
  // get types for attributes
  .map(BrowseIndex.strAttributesToBrowseIndex)
  // cast it to a dataset (requires implicit conversions)
  .toDS()
  // pick rows for the given marketplaces
  .where($"mid".isin(mids: _*))
  // pick rows for the given indices
  .where($"index".isin(indices: _*))

}

如果有人提供 mids = Seq()indices = Seq(),此实现将过滤掉所有内容。另一方面,我希望语义是 "apply this where clause only if mids is not empty"(与 indices 相同),这样如果函数的用户提供空序列就不会发生过滤。

有没有很好的实用方法来做到这一点?

您可以使用短路评估,如果提供的 Seq 不为空,这应该只应用过滤器:

import org.apache.spark.sql.functions.lit

spark
    .sparkContext.textFile(s3BrowseIndex)
    // split text dataset
    .map(line => line.split("\s+"))
    // get types for attributes
    .map(BrowseIndex.strAttributesToBrowseIndex)
    // cast it to a dataset (requires implicit conversions)
    .toDS()
    // pick rows for the given marketplaces
    .where(lit(mids.isEmpty) or $"mid".isin(mids: _*))
    // pick rows for the given indices
    .where(lit(indices.isEmpty) or $"index".isin(indices: _*))

Raphael Roth 的答案对于应用过滤器的特定问题是一个不错的选择,如果您不介意稍微复杂的逻辑的话。适用于任何条件转换(不仅仅是过滤,也不仅仅是在决策分支之一上什么都不做)的通用解决方案是使用 transform,例如,

spark
  .sparkContext.textFile(s3BrowseIndex)
  // split text dataset
  .map(line => line.split("\s+"))
  // get types for attributes
  .map(BrowseIndex.strAttributesToBrowseIndex)
  // cast it to a dataset (requires implicit conversions)
  .toDS()
  .transform { ds =>
    // pick rows for the given marketplaces
    if (mids.isEmpty) ds
    else ds.where($"mid".isin(mids: _*))
  }
  .transform { ds =>
    // pick rows for the given indices
    if (indices.isEmpty) ds
    else ds.where($"index".isin(indices: _*))
  }

如果您使用的是稳定类型的数据集(或数据帧,Dataset[Row]),transform 可能非常有用,因为您可以构建转换函数序列,然后应用它们:

transformations.foldLeft(ds)(_ transform _)

在许多情况下,这种方法有助于代码重用和可测试性。