我如何在 SparkR 中按小时分组?
How do I group by hour in SparkR?
我正在尝试使用 SparkR 和 Spark 2.1.0 按时间汇总一些日期。
我的数据如下:
created_at
1 Sun Jul 31 22:25:01 +0000 2016
2 Sun Jul 31 22:25:01 +0000 2016
3 Fri Jun 03 10:16:57 +0000 2016
4 Mon May 30 19:23:55 +0000 2016
5 Sat Jun 11 21:00:07 +0000 2016
6 Tue Jul 12 16:31:46 +0000 2016
7 Sun May 29 19:12:26 +0000 2016
8 Sat Aug 06 11:04:29 +0000 2016
9 Sat Aug 06 11:04:29 +0000 2016
10 Sat Aug 06 11:04:29 +0000 2016
我希望输出为:
Hour Count
22 2
10 1
19 1
11 3
....
我试过了:
sumdf <- summarize(groupBy(df, df$created_at), count = n(df$created_at))
head(select(sumdf, "created_at", "count"),10)
但是分组到最接近的秒数:
created_at count
1 Sun Jun 12 10:24:54 +0000 2016 1
2 Tue Aug 09 14:12:35 +0000 2016 2
3 Fri Jul 29 19:22:03 +0000 2016 2
4 Mon Jul 25 21:05:05 +0000 2016 2
我试过了:
sumdf <- summarize(groupBy(df, hr=hour(df$created_at)), count = n(hour(df$created_at)))
head(select(sumdf, "hour(created_at)", "count"),20)
但这给出了:
hour(created_at) count
1 NA 0
我试过了:
sumdf <- summarize(groupBy(df, df$created_at), count = n(hour(df$created_at)))
head(select(sumdf, "created_at", "count"),10)
但这给出了:
created_at count
1 Sun Jun 12 10:24:54 +0000 2016 0
2 Tue Aug 09 14:12:35 +0000 2016 0
3 Fri Jul 29 19:22:03 +0000 2016 0
4 Mon Jul 25 21:05:05 +0000 2016 0
...
如何使用小时函数来实现,或者有更好的方法吗?
这是SCALA的代码,我想你可以参考一下。
var index = ss.sparkContext.parallelize( Seq(
(1,"Sun Jul 31 22:25:01 +0000 2016"),
(2,"Sun Jul 31 22:25:01 +0000 2016"),
(3,"Fri Jun 03 10:16:57 +0000 2016"),
(4,"Mon May 30 19:23:55 +0000 2016"),
(5,"Sat Jun 11 21:00:07 +0000 2016"),
(6,"Tue Jul 12 16:31:46 +0000 2016"),
(7,"Sun May 29 19:12:26 +0000 2016"),
(8,"Sat Aug 06 11:04:29 +0000 2016"),
(9,"Sat Aug 06 11:04:29 +0000 2016"),
(10,"Sat Aug 06 11:04:29 +0000 2016"))
).toDF("ID", "time")
val getHour = udf( (s : String) => {
s.substring( 11, 13)
})
index.withColumn("hour", getHour($"time")).groupBy( "hour").agg( count("*").as("count")).show
假设您的本地 table 是 df
,这里真正的问题是从您的 created_at
列中提取小时,然后使用您的分组代码。为此,您可以使用 dapply
:
library(SparkR)
sc1 <- sparkR.session()
df2 <- createDataFrame(df)
#with dapply you need to specify the schema i.e. the data.frame that will come out
#of the applied function - i.e. substringDF in our case
schema <- structType(structField('created_at', 'string'), structField('time', 'string'))
#a function that will be applied to each partition of the spark data frame.
#remember that each partition is a data.frame itself.
substringDF <- function(DF) {
DF$time <- substr(DF$created_at, 15, 16)
DF
}
#and then we use the above in dapply
df3 <- dapply(df2, substringDF, schema)
head(df3)
# created_at time
#1 1 Sun Jul 31 22:25:01 +0000 2016 22
#2 2 Sun Jul 31 22:25:01 +0000 2016 22
#3 3 Fri Jun 03 10:16:57 +0000 2016 10
#4 4 Mon May 30 19:23:55 +0000 2016 19
#5 5 Sat Jun 11 21:00:07 +0000 2016 21
#6 6 Tue Jul 12 16:31:46 +0000 2016 16
然后只需应用您的正常分组代码:
sumdf <- summarize(groupBy(df3, df3$time), count = n(df3$time))
head(select(sumdf, "time", "count"))
# time count
#1 11 3
#2 22 2
#3 16 1
#4 19 2
#5 10 1
#6 21 1
我会用 to_timestamp
(Spark 2.2) 或 unix_timestamp %>% cast("timestamp")
(早期版本)解析日期并访问 hour
:
df <- createDataFrame(data.frame(created_at="Sat Aug 19 12:33:26 +0000 2017"))
head(count(group_by(df,
alias(hour(to_timestamp(column("created_at"), "EEE MMM d HH:mm:ss Z yyyy")), "hour")
)))
## hour count
## 1 14 1
我正在尝试使用 SparkR 和 Spark 2.1.0 按时间汇总一些日期。 我的数据如下:
created_at
1 Sun Jul 31 22:25:01 +0000 2016
2 Sun Jul 31 22:25:01 +0000 2016
3 Fri Jun 03 10:16:57 +0000 2016
4 Mon May 30 19:23:55 +0000 2016
5 Sat Jun 11 21:00:07 +0000 2016
6 Tue Jul 12 16:31:46 +0000 2016
7 Sun May 29 19:12:26 +0000 2016
8 Sat Aug 06 11:04:29 +0000 2016
9 Sat Aug 06 11:04:29 +0000 2016
10 Sat Aug 06 11:04:29 +0000 2016
我希望输出为:
Hour Count
22 2
10 1
19 1
11 3
....
我试过了:
sumdf <- summarize(groupBy(df, df$created_at), count = n(df$created_at))
head(select(sumdf, "created_at", "count"),10)
但是分组到最接近的秒数:
created_at count
1 Sun Jun 12 10:24:54 +0000 2016 1
2 Tue Aug 09 14:12:35 +0000 2016 2
3 Fri Jul 29 19:22:03 +0000 2016 2
4 Mon Jul 25 21:05:05 +0000 2016 2
我试过了:
sumdf <- summarize(groupBy(df, hr=hour(df$created_at)), count = n(hour(df$created_at)))
head(select(sumdf, "hour(created_at)", "count"),20)
但这给出了:
hour(created_at) count
1 NA 0
我试过了:
sumdf <- summarize(groupBy(df, df$created_at), count = n(hour(df$created_at)))
head(select(sumdf, "created_at", "count"),10)
但这给出了:
created_at count
1 Sun Jun 12 10:24:54 +0000 2016 0
2 Tue Aug 09 14:12:35 +0000 2016 0
3 Fri Jul 29 19:22:03 +0000 2016 0
4 Mon Jul 25 21:05:05 +0000 2016 0
...
如何使用小时函数来实现,或者有更好的方法吗?
这是SCALA的代码,我想你可以参考一下。
var index = ss.sparkContext.parallelize( Seq(
(1,"Sun Jul 31 22:25:01 +0000 2016"),
(2,"Sun Jul 31 22:25:01 +0000 2016"),
(3,"Fri Jun 03 10:16:57 +0000 2016"),
(4,"Mon May 30 19:23:55 +0000 2016"),
(5,"Sat Jun 11 21:00:07 +0000 2016"),
(6,"Tue Jul 12 16:31:46 +0000 2016"),
(7,"Sun May 29 19:12:26 +0000 2016"),
(8,"Sat Aug 06 11:04:29 +0000 2016"),
(9,"Sat Aug 06 11:04:29 +0000 2016"),
(10,"Sat Aug 06 11:04:29 +0000 2016"))
).toDF("ID", "time")
val getHour = udf( (s : String) => {
s.substring( 11, 13)
})
index.withColumn("hour", getHour($"time")).groupBy( "hour").agg( count("*").as("count")).show
假设您的本地 table 是 df
,这里真正的问题是从您的 created_at
列中提取小时,然后使用您的分组代码。为此,您可以使用 dapply
:
library(SparkR)
sc1 <- sparkR.session()
df2 <- createDataFrame(df)
#with dapply you need to specify the schema i.e. the data.frame that will come out
#of the applied function - i.e. substringDF in our case
schema <- structType(structField('created_at', 'string'), structField('time', 'string'))
#a function that will be applied to each partition of the spark data frame.
#remember that each partition is a data.frame itself.
substringDF <- function(DF) {
DF$time <- substr(DF$created_at, 15, 16)
DF
}
#and then we use the above in dapply
df3 <- dapply(df2, substringDF, schema)
head(df3)
# created_at time
#1 1 Sun Jul 31 22:25:01 +0000 2016 22
#2 2 Sun Jul 31 22:25:01 +0000 2016 22
#3 3 Fri Jun 03 10:16:57 +0000 2016 10
#4 4 Mon May 30 19:23:55 +0000 2016 19
#5 5 Sat Jun 11 21:00:07 +0000 2016 21
#6 6 Tue Jul 12 16:31:46 +0000 2016 16
然后只需应用您的正常分组代码:
sumdf <- summarize(groupBy(df3, df3$time), count = n(df3$time))
head(select(sumdf, "time", "count"))
# time count
#1 11 3
#2 22 2
#3 16 1
#4 19 2
#5 10 1
#6 21 1
我会用 to_timestamp
(Spark 2.2) 或 unix_timestamp %>% cast("timestamp")
(早期版本)解析日期并访问 hour
:
df <- createDataFrame(data.frame(created_at="Sat Aug 19 12:33:26 +0000 2017"))
head(count(group_by(df,
alias(hour(to_timestamp(column("created_at"), "EEE MMM d HH:mm:ss Z yyyy")), "hour")
)))
## hour count
## 1 14 1