在 Spark 中使用窗口函数

Using windowing functions in Spark

我正在尝试在 Spark 数据帧中使用 rowNumber。我的查询在 Spark shell 中按预期工作。但是当我在 eclipse 中写出它们并编译一个 jar 时,我遇到了一个错误

 16/03/23 05:52:43 ERROR ApplicationMaster: User class threw exception:org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;
org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;

我的查询

import org.apache.spark.sql.functions.{rowNumber, max, broadcast}
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"id").orderBy($"value".desc)

val dfTop = df.withColumn("rn", rowNumber.over(w)).where($"rn" <= 3).drop("rn")

在 Spark shell 中 运行 查询时,我没有使用 HiveContext。不知道为什么当我 运行 与 jar 文件相同时返回错误。如果有帮助,我也会 运行 在 Spark 1.6.0 上编写脚本。有人遇到过类似的问题吗?

我之前已经回答过了。错误信息说明了一切。使用 spark < version 2.x,您需要在应用程序 jar 中添加一个 HiveContext,别无他法。

您可以进一步阅读 SQLContext 和 HiveContext 之间的区别。

SparkSQL 有一个 SQLContext 和一个 HiveContextHiveContextSQLContext 的超集。 Spark 社区建议使用 HiveContext。您可以看到,当您 运行 spark-shell,这是您的交互式驱动程序应用程序时,它会自动创建一个定义为 sc 的 SparkContext 和一个定义为 [=20 的 HiveContext =]. HiveContext 允许您执行 SQL 查询以及 Hive 命令。

您可以尝试检查您的 spark-shell 内部:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_74)

scala> sqlContext.isInstanceOf[org.apache.spark.sql.hive.HiveContext]
res0: Boolean = true

scala> sqlContext.isInstanceOf[org.apache.spark.sql.SQLContext]
res1: Boolean = true

scala> sqlContext.getClass.getName
res2: String = org.apache.spark.sql.hive.HiveContext

通过继承,HiveContext 实际上是一个 SQLContext,但反过来就不是这样了。如果您更想知道 HiveContext 如何继承自 SQLContext,您可以查看 source code

spark 2.0 开始,您只需创建一个 SparkSession(作为单一入口点)来删除 HiveContext/SQLContext 混淆问题。

对于Spark 2.0,建议使用SparkSession作为单一入口点。它消除了 HiveContext/SqlContext 混淆问题。

import org.apache.spark.sql.SparkSession
val session = SparkSession.builder
    .master("local")
    .appName("application name")
    .getOrCreate()

查看此 databricks article 了解如何使用它。