在 Spark SQL 中按日期分组聚合
Aggregation with Group By date in Spark SQL
我有一个 RDD,其中包含一个名为 time 的时间戳,类型为 long:
root
|-- id: string (nullable = true)
|-- value1: string (nullable = true)
|-- value2: string (nullable = true)
|-- time: long (nullable = true)
|-- type: string (nullable = true)
我正在尝试按值 1、值 2 和时间分组为 YYYY-MM-DD。我尝试按 cast(time as Date) 分组,但随后出现以下错误:
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.RuntimeException: [1.21] failure: ``DECIMAL'' expected but identifier Date found
这是否意味着无法按日期分组?我什至尝试添加另一个级别的转换以将其作为字符串:
cast(cast(time as Date) as String)
其中returns同样的错误。
我读到我可以在 RDD 上使用 aggregateByKey,但我不明白如何将它用于几列并将那么长的长度转换为 YYYY-MM-DD 字符串。我该如何进行?
不确定这是否是你的 meant/needed 但我在 spark-sql 中遇到了与 date/timestamp 打交道的同样困难,这是我唯一想到的在时间戳中投射字符串,因为(对我而言)似乎不可能在 spark-sql.
中输入日期类型
无论如何,这是我的代码,用于完成与您的需要(可能)类似的事情(Long 代替 String):
val mySQL = sqlContext.sql("select cast(yourLong as timestamp) as time_cast" +
" ,count(1) total "+
" from logs" +
" group by cast(yourLong as timestamp)"
)
val result= mySQL.map(x=>(x(0).toString,x(1).toString))
输出是这样的:
(2009-12-18 10:09:28.0,7)
(2009-12-18 05:55:14.0,1)
(2009-12-18 16:02:50.0,2)
(2009-12-18 09:32:32.0,2)
即使我使用的是时间戳而不是日期,这对你也有用吗?
希望能帮到你
FF
编辑:
为了测试从 Long 到 Timestamp 的 "single-cast",我尝试了这个简单的更改:
val mySQL = sqlContext.sql("select cast(1430838439 as timestamp) as time_cast" +
" ,count(1) total "+
" from logs" +
" group by cast(1430838439 as timestamp)"
)
val result= mySQL.map(x=>(x(0),x(1)))
并且一切正常,结果:
(1970-01-17 14:27:18.439,4) // 4 because I have 4 rows in my table
我通过添加这个函数解决了这个问题:
def convert( time:Long ) : String = {
val sdf = new java.text.SimpleDateFormat("yyyy-MM-dd")
return sdf.format(new java.util.Date(time))
}
并像这样将其注册到 sqlContext 中:
sqlContext.registerFunction("convert", convert _)
然后我终于可以按日期分组了:
select * from table convert(time)
我正在使用 Spark 1.4.0,因为 1.2.0 DATE
似乎出现在 Spark SQL API (SPARK-2562) 中。 DATE
应该允许您按 YYYY-MM-DD
的时间分组。
我也有类似的数据结构,其中我的 created_on
类似于您的 time
字段。
root
|-- id: long (nullable = true)
|-- value1: long (nullable = true)
|-- created_on: long (nullable = true)
我使用 FROM_UNIXTIME(created_on,'YYYY-MM-dd')
解决了它并且效果很好:
val countQuery = "SELECT FROM_UNIXTIME(created_on,'YYYY-MM-dd') as `date_created`, COUNT(*) AS `count` FROM user GROUP BY FROM_UNIXTIME(created_on,'YYYY-MM-dd')"
从这里开始,您可以执行正常操作,将查询执行到数据帧中等等。
FROM_UNIXTIME
起作用可能是因为我的 Spark 安装中包含 Hive,它是 Hive UDF. However it will be included as part of the Spark SQL native syntax in future releases (SPARK-8175)。
我有一个 RDD,其中包含一个名为 time 的时间戳,类型为 long:
root
|-- id: string (nullable = true)
|-- value1: string (nullable = true)
|-- value2: string (nullable = true)
|-- time: long (nullable = true)
|-- type: string (nullable = true)
我正在尝试按值 1、值 2 和时间分组为 YYYY-MM-DD。我尝试按 cast(time as Date) 分组,但随后出现以下错误:
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.RuntimeException: [1.21] failure: ``DECIMAL'' expected but identifier Date found
这是否意味着无法按日期分组?我什至尝试添加另一个级别的转换以将其作为字符串:
cast(cast(time as Date) as String)
其中returns同样的错误。
我读到我可以在 RDD 上使用 aggregateByKey,但我不明白如何将它用于几列并将那么长的长度转换为 YYYY-MM-DD 字符串。我该如何进行?
不确定这是否是你的 meant/needed 但我在 spark-sql 中遇到了与 date/timestamp 打交道的同样困难,这是我唯一想到的在时间戳中投射字符串,因为(对我而言)似乎不可能在 spark-sql.
中输入日期类型无论如何,这是我的代码,用于完成与您的需要(可能)类似的事情(Long 代替 String):
val mySQL = sqlContext.sql("select cast(yourLong as timestamp) as time_cast" +
" ,count(1) total "+
" from logs" +
" group by cast(yourLong as timestamp)"
)
val result= mySQL.map(x=>(x(0).toString,x(1).toString))
输出是这样的:
(2009-12-18 10:09:28.0,7)
(2009-12-18 05:55:14.0,1)
(2009-12-18 16:02:50.0,2)
(2009-12-18 09:32:32.0,2)
即使我使用的是时间戳而不是日期,这对你也有用吗?
希望能帮到你
FF
编辑: 为了测试从 Long 到 Timestamp 的 "single-cast",我尝试了这个简单的更改:
val mySQL = sqlContext.sql("select cast(1430838439 as timestamp) as time_cast" +
" ,count(1) total "+
" from logs" +
" group by cast(1430838439 as timestamp)"
)
val result= mySQL.map(x=>(x(0),x(1)))
并且一切正常,结果:
(1970-01-17 14:27:18.439,4) // 4 because I have 4 rows in my table
我通过添加这个函数解决了这个问题:
def convert( time:Long ) : String = {
val sdf = new java.text.SimpleDateFormat("yyyy-MM-dd")
return sdf.format(new java.util.Date(time))
}
并像这样将其注册到 sqlContext 中:
sqlContext.registerFunction("convert", convert _)
然后我终于可以按日期分组了:
select * from table convert(time)
我正在使用 Spark 1.4.0,因为 1.2.0 DATE
似乎出现在 Spark SQL API (SPARK-2562) 中。 DATE
应该允许您按 YYYY-MM-DD
的时间分组。
我也有类似的数据结构,其中我的 created_on
类似于您的 time
字段。
root
|-- id: long (nullable = true)
|-- value1: long (nullable = true)
|-- created_on: long (nullable = true)
我使用 FROM_UNIXTIME(created_on,'YYYY-MM-dd')
解决了它并且效果很好:
val countQuery = "SELECT FROM_UNIXTIME(created_on,'YYYY-MM-dd') as `date_created`, COUNT(*) AS `count` FROM user GROUP BY FROM_UNIXTIME(created_on,'YYYY-MM-dd')"
从这里开始,您可以执行正常操作,将查询执行到数据帧中等等。
FROM_UNIXTIME
起作用可能是因为我的 Spark 安装中包含 Hive,它是 Hive UDF. However it will be included as part of the Spark SQL native syntax in future releases (SPARK-8175)。