window Spark 1.6 的功能

window function for Spark 1.6

我有这个 DataFrame :

+----+------+--------------------+--------+-------------+ 
| id | name |    end time        |  value |    comment  |
---------------------------------------------------------
|1   |node1 |2017-03-24 08:30:00 |    5   | blabla      |
---------------------------------------------------------
|2   |node1 |2017-03-24 09:00:00 |    3   | blabla      |
---------------------------------------------------------
|3   |node1 |2017-03-24 09:30:00 |    8   | blabla      |
---------------------------------------------------------
|4   |node2 |2017-03-24 10:00:00 |    5   | blabla      |
---------------------------------------------------------
|5   |node2 |2017-03-24 10:30:00 |    3  | blabla      |
---------------------------------------------------------
|6   |node2 |2017-03-24 11:00:00 |    1   | blabla      |
---------------------------------------------------------
|7   |node2 |2017-03-24 11:30:00 |    3   | blabla      |
---------------------------------------------------------
|8   |node2 |2017-03-24 12:00:00 |    5  | blabla      |
---------------------------------------------------------

而且我需要在 2 小时内找到值小于 6 的节点。我如何在 Spark 1.6 中做到这一点? 提前致谢!

编辑:这仅在 Spark 2.x

您可以使用 window 聚合函数:

df.groupBy(
    col("name"),
    window(col("end time"), "2 hour", "30 minute")
)
.agg(max("value").as("2_hour_max_value"))
.filter(col("2_hour_max_value") < 6)
.select("name")
.distinct()

在 Spark 1.6 中,窗口功能仅适用于 HiveContext。所以你必须创建 hiveContext.

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

为了利用 hiveContext,您必须将数据帧注册为临时 table 并且 运行 使用 hiveContext 在临时 table 上使用窗口函数进行查询。

df.registerTempTable("dfTable")


val df = hiveContext.sql("""SELECT *,
                                row_number() over(partition by <partitionColum> order by <orderColumn> ) AS rank
                         FROM dfTable""")
                     

对于仍在使用 Spark 1.6 的人们:

需要使用 hivecontext 创建数据框。否则,hiveContext.sql 将无法识别已注册的 table。这是 spark shell:

中的一个例子
val hiveContext=new org.apache.spark.sql.hive.HiveContext(sc)
val someRDD=....
var someDF=hiveContext.createDataFrame(someRDD)
someDF=someDF.withColumnRenamed("_1","col1")....
someDF.registerTempTable("someTbl")
val ranked=hiveContext.sql("SELECT *, row_number() OVER(PARTITION by col1,col2,col3 order by col4 desc) AS rank FROM someTbl")
val maxCountPerGroup=ranked.filter(ranked("rank")===1).drop("rank")
maxCountPerGroup.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(output)