使用 Spark DataFrame 的地理过滤器

Geo Filter with Spark DataFrame

我是 spark 数据框的新手,它有时很奇怪。 假设我有一个包含带有纬度和经度坐标的日志的数据框。

 LogsDataFrame.printSchema :
 root
 |-- lat: double (nullable = false)
 |-- lon: double (nullable = false)
 |-- imp: string (nullable = false)
 |-- log_date: string (nullable = true)
 |-- pubuid: string (nullable = true)

另一方面我有一个简单的方法

within(lat : Double, long : Double, radius : Double) : Boolean

表明纬度和经度是否在预定义位置的特定半径内。

现在,我如何过滤不满足within的点Log。我试过了

logsDataFrame.filter(within(logsDF("lat"), logsDF("lon"), RADIUS)

但它不推断 Double 而是返回 Column 作为类型。 我怎样才能让这个工作? spark 站点中的文档有点简单,我确定我遗漏了一些东西。

感谢您的帮助。

一般来说,您至少需要两件事才能让它发挥作用。首先你必须创建一个 UDF 包装 within:

import org.apache.spark.sql.functions.{udf, lit}

val withinUDF = udf(within _)

接下来,调用UDF时,radius应该被标记为文字:

df.where(withinUDF($"lat", $"long", lit(RADIUS)))

由于并非所有类型都可以通过这种方式传递,而且创建包装器和调用 lit 相当乏味,您可能更喜欢柯里化:

def within(radius: Double) = udf((lat: Double, long: Double) => ???)

df.where(within(RADIUS)($"lat", $"long"))