如何在 Scala 中使用 TimeStamp/Date 列表获取事件发生的每小时平均值
How to get Hourly average of Occurrences of event using TimeStamp/Date List in Scala
时间戳的示例数据
2018-04-07 07:07:17
2018-04-07 07:32:27
2018-04-07 08:36:44
2018-04-07 08:38:00
2018-04-07 08:39:29
2018-04-08 01:43:08
2018-04-08 01:43:55
2018-04-09 07:52:31
2018-04-09 07:52:42
2019-01-24 11:52:31
2019-01-24 12:52:42
2019-01-25 12:52:42
预期输出:
(2+3+2+2+1+3)/6 = 1.66
我也必须每周和每月做一次,但我可以从每小时逻辑中推断出来。
// Iterate over each entry of a group
def update(buffer: MutableAggregationBuffer, input: Row) = {
val dateString = input(0).toString()
val dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.S")
val zdt = ZonedDateTime.parse(dateString, dtf.withZone(ZoneId.systemDefault)
// zdt ZonedDateTime
我可以使用 ZonedDateTime 的所有方法
我尝试使用以下方法解决-
Please note that the code is running in IST (GMT + 5.30), Therefore the dates
2018-04-07 07:07:17 and
2018-04-07 07:32:27
will be considered in different hours (1st in 6:30 - 7:30 and 2nd in 7:30 - 8:30)
代码
Read the data
val spark = sqlContext.sparkSession
val implicits = spark.implicits
import implicits._
val data =
"""
|2018-04-07 07:07:17
|2018-04-07 07:32:27
|2018-04-07 08:36:44
|2018-04-07 08:38:00
|2018-04-07 08:39:29
|2018-04-08 01:43:08
|2018-04-08 01:43:55
|2018-04-09 07:52:31
|2018-04-09 07:52:42
|2019-01-24 11:52:31
|2019-01-24 12:52:42
|2019-01-25 12:52:42
""".stripMargin
val df = spark.read
.schema(StructType(Array(StructField("date_time", DataTypes.TimestampType))))
.csv(data.split(System.lineSeparator()).toSeq.toDS())
df.show(false)
df.printSchema()
结果-
+-------------------+
|date_time |
+-------------------+
|2018-04-07 07:07:17|
|2018-04-07 07:32:27|
|2018-04-07 08:36:44|
|2018-04-07 08:38:00|
|2018-04-07 08:39:29|
|2018-04-08 01:43:08|
|2018-04-08 01:43:55|
|2018-04-09 07:52:31|
|2018-04-09 07:52:42|
|2019-01-24 11:52:31|
|2019-01-24 12:52:42|
|2019-01-25 12:52:42|
+-------------------+
root
|-- date_time: timestamp (nullable = true)
Bucketize the data and find the count for each hour
val hour = 60 * 60
// convert the time into unix epoch
val processedDF = df.withColumn("unix_epoch", unix_timestamp(col("date_time")))
.withColumn("hour_bucket", floor(col("unix_epoch")/hour))
.groupBy("hour_bucket")
.count()
processedDF.show(false)
结果-
+-----------+-----+
|hour_bucket|count|
+-----------+-----+
|423073 |1 |
|423074 |1 |
|423075 |3 |
|423092 |2 |
|423122 |2 |
|430087 |1 |
|430086 |1 |
|430111 |1 |
+-----------+-----+
find hourly average
// average count
processedDF.agg(avg("count")).show(false)
结果-
+----------+
|avg(count)|
+----------+
|1.5 |
+----------+
希望对您有所帮助!
时间戳的示例数据
2018-04-07 07:07:17
2018-04-07 07:32:27
2018-04-07 08:36:44
2018-04-07 08:38:00
2018-04-07 08:39:29
2018-04-08 01:43:08
2018-04-08 01:43:55
2018-04-09 07:52:31
2018-04-09 07:52:42
2019-01-24 11:52:31
2019-01-24 12:52:42
2019-01-25 12:52:42
预期输出: (2+3+2+2+1+3)/6 = 1.66
我也必须每周和每月做一次,但我可以从每小时逻辑中推断出来。
// Iterate over each entry of a group
def update(buffer: MutableAggregationBuffer, input: Row) = {
val dateString = input(0).toString()
val dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.S")
val zdt = ZonedDateTime.parse(dateString, dtf.withZone(ZoneId.systemDefault)
// zdt ZonedDateTime
我可以使用 ZonedDateTime 的所有方法
我尝试使用以下方法解决-
Please note that the code is running in IST (GMT + 5.30), Therefore the dates 2018-04-07 07:07:17 and 2018-04-07 07:32:27 will be considered in different hours (1st in 6:30 - 7:30 and 2nd in 7:30 - 8:30)
代码
Read the data
val spark = sqlContext.sparkSession
val implicits = spark.implicits
import implicits._
val data =
"""
|2018-04-07 07:07:17
|2018-04-07 07:32:27
|2018-04-07 08:36:44
|2018-04-07 08:38:00
|2018-04-07 08:39:29
|2018-04-08 01:43:08
|2018-04-08 01:43:55
|2018-04-09 07:52:31
|2018-04-09 07:52:42
|2019-01-24 11:52:31
|2019-01-24 12:52:42
|2019-01-25 12:52:42
""".stripMargin
val df = spark.read
.schema(StructType(Array(StructField("date_time", DataTypes.TimestampType))))
.csv(data.split(System.lineSeparator()).toSeq.toDS())
df.show(false)
df.printSchema()
结果-
+-------------------+
|date_time |
+-------------------+
|2018-04-07 07:07:17|
|2018-04-07 07:32:27|
|2018-04-07 08:36:44|
|2018-04-07 08:38:00|
|2018-04-07 08:39:29|
|2018-04-08 01:43:08|
|2018-04-08 01:43:55|
|2018-04-09 07:52:31|
|2018-04-09 07:52:42|
|2019-01-24 11:52:31|
|2019-01-24 12:52:42|
|2019-01-25 12:52:42|
+-------------------+
root
|-- date_time: timestamp (nullable = true)
Bucketize the data and find the count for each hour
val hour = 60 * 60
// convert the time into unix epoch
val processedDF = df.withColumn("unix_epoch", unix_timestamp(col("date_time")))
.withColumn("hour_bucket", floor(col("unix_epoch")/hour))
.groupBy("hour_bucket")
.count()
processedDF.show(false)
结果-
+-----------+-----+
|hour_bucket|count|
+-----------+-----+
|423073 |1 |
|423074 |1 |
|423075 |3 |
|423092 |2 |
|423122 |2 |
|430087 |1 |
|430086 |1 |
|430111 |1 |
+-----------+-----+
find hourly average
// average count
processedDF.agg(avg("count")).show(false)
结果-
+----------+
|avg(count)|
+----------+
|1.5 |
+----------+
希望对您有所帮助!