window Spark 1.6 的功能
window function for Spark 1.6
我有这个 DataFrame :
+----+------+--------------------+--------+-------------+
| id | name | end time | value | comment |
---------------------------------------------------------
|1 |node1 |2017-03-24 08:30:00 | 5 | blabla |
---------------------------------------------------------
|2 |node1 |2017-03-24 09:00:00 | 3 | blabla |
---------------------------------------------------------
|3 |node1 |2017-03-24 09:30:00 | 8 | blabla |
---------------------------------------------------------
|4 |node2 |2017-03-24 10:00:00 | 5 | blabla |
---------------------------------------------------------
|5 |node2 |2017-03-24 10:30:00 | 3 | blabla |
---------------------------------------------------------
|6 |node2 |2017-03-24 11:00:00 | 1 | blabla |
---------------------------------------------------------
|7 |node2 |2017-03-24 11:30:00 | 3 | blabla |
---------------------------------------------------------
|8 |node2 |2017-03-24 12:00:00 | 5 | blabla |
---------------------------------------------------------
而且我需要在 2 小时内找到值小于 6 的节点。我如何在 Spark 1.6 中做到这一点?
提前致谢!
编辑:这仅在 Spark 2.x
中
您可以使用 window 聚合函数:
df.groupBy(
col("name"),
window(col("end time"), "2 hour", "30 minute")
)
.agg(max("value").as("2_hour_max_value"))
.filter(col("2_hour_max_value") < 6)
.select("name")
.distinct()
在 Spark 1.6 中,窗口功能仅适用于 HiveContext。所以你必须创建 hiveContext.
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
为了利用 hiveContext,您必须将数据帧注册为临时 table 并且 运行 使用 hiveContext 在临时 table 上使用窗口函数进行查询。
df.registerTempTable("dfTable")
val df = hiveContext.sql("""SELECT *,
row_number() over(partition by <partitionColum> order by <orderColumn> ) AS rank
FROM dfTable""")
对于仍在使用 Spark 1.6 的人们:
需要使用 hivecontext 创建数据框。否则,hiveContext.sql 将无法识别已注册的 table。这是 spark shell:
中的一个例子
val hiveContext=new org.apache.spark.sql.hive.HiveContext(sc)
val someRDD=....
var someDF=hiveContext.createDataFrame(someRDD)
someDF=someDF.withColumnRenamed("_1","col1")....
someDF.registerTempTable("someTbl")
val ranked=hiveContext.sql("SELECT *, row_number() OVER(PARTITION by col1,col2,col3 order by col4 desc) AS rank FROM someTbl")
val maxCountPerGroup=ranked.filter(ranked("rank")===1).drop("rank")
maxCountPerGroup.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(output)
我有这个 DataFrame :
+----+------+--------------------+--------+-------------+
| id | name | end time | value | comment |
---------------------------------------------------------
|1 |node1 |2017-03-24 08:30:00 | 5 | blabla |
---------------------------------------------------------
|2 |node1 |2017-03-24 09:00:00 | 3 | blabla |
---------------------------------------------------------
|3 |node1 |2017-03-24 09:30:00 | 8 | blabla |
---------------------------------------------------------
|4 |node2 |2017-03-24 10:00:00 | 5 | blabla |
---------------------------------------------------------
|5 |node2 |2017-03-24 10:30:00 | 3 | blabla |
---------------------------------------------------------
|6 |node2 |2017-03-24 11:00:00 | 1 | blabla |
---------------------------------------------------------
|7 |node2 |2017-03-24 11:30:00 | 3 | blabla |
---------------------------------------------------------
|8 |node2 |2017-03-24 12:00:00 | 5 | blabla |
---------------------------------------------------------
而且我需要在 2 小时内找到值小于 6 的节点。我如何在 Spark 1.6 中做到这一点? 提前致谢!
编辑:这仅在 Spark 2.x
中您可以使用 window 聚合函数:
df.groupBy(
col("name"),
window(col("end time"), "2 hour", "30 minute")
)
.agg(max("value").as("2_hour_max_value"))
.filter(col("2_hour_max_value") < 6)
.select("name")
.distinct()
在 Spark 1.6 中,窗口功能仅适用于 HiveContext。所以你必须创建 hiveContext.
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
为了利用 hiveContext,您必须将数据帧注册为临时 table 并且 运行 使用 hiveContext 在临时 table 上使用窗口函数进行查询。
df.registerTempTable("dfTable")
val df = hiveContext.sql("""SELECT *,
row_number() over(partition by <partitionColum> order by <orderColumn> ) AS rank
FROM dfTable""")
对于仍在使用 Spark 1.6 的人们:
需要使用 hivecontext 创建数据框。否则,hiveContext.sql 将无法识别已注册的 table。这是 spark shell:
中的一个例子val hiveContext=new org.apache.spark.sql.hive.HiveContext(sc)
val someRDD=....
var someDF=hiveContext.createDataFrame(someRDD)
someDF=someDF.withColumnRenamed("_1","col1")....
someDF.registerTempTable("someTbl")
val ranked=hiveContext.sql("SELECT *, row_number() OVER(PARTITION by col1,col2,col3 order by col4 desc) AS rank FROM someTbl")
val maxCountPerGroup=ranked.filter(ranked("rank")===1).drop("rank")
maxCountPerGroup.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(output)