spark sql 动态过滤条件

spark sql dynamic filter condition

如何在 spark sql 中动态构建布尔过滤条件? 拥有:

val d = Seq(1, 2, 3, 5, 6).toDF
d.filter(col("value") === 1 or col("value") === 3).show

如何动态复制:

val desiredThings = Seq(1,3)

我尝试构建过滤器:

val myCondition = desiredThings.map(col("value") === _)
d.filter(myCondition).show

但失败:

overloaded method value filter with alternatives:
org.apache.spark.api.java.function.FilterFunction[org.apache.spark.sql.Row]
 cannot be applied to (Seq[org.apache.spark.sql.Column])

执行时

d.filter(myCondition).show

同样在尝试向左折叠时:

val myCondition = desiredThings.foldLeft()((result, entry) => result && col(c.columnCounterId) === entry)

我有编译错误。

如何调整代码以动态生成过滤谓词?

只需使用isin:

d.filter(col("value").isin(desiredThings: _*))

但是如果你真的想要foldLeft你必须提供基本条件:

d.filter(desiredThings.foldLeft(lit(false))(
  (acc, x) => (acc || col("value") === (x)))
)

或者,要与 filter 或 where 一起使用,您可以使用以下方法生成 SQL 表达式:

val filterExpr = desiredThings.map( v => s"value = $v").mkString(" or ")

然后像

一样使用它
d.filter(filterExpr).show
// or
d.where(filterExpr).show

//+-----+
//|value|
//+-----+
//|    1|
//|    3|
//+-----+