在 SparkR 的 DataFrame 中按时间戳过滤行

Filter rows by timestamp in DataFrame of SparkR

我想按时间戳过滤 SparkR 中 DataFrame 的行,格式如下:

df <- createDataFrame(sqlContext, data.frame(ID = c(1,2,3),
                                             Timestamp=c('08/01/2014 11:18:30',
                                                         '01/01/2015 12:13:45',
                                                         '05/01/2015 14:17:33')))

请注意 TimeStamp 列的原始架构是 String。假设我想过滤 03/01/2015 00:00:00 之前的那些时间戳,我认为可能有两种方法可以做到这一点:

一种是将列突变为 timestamp,就像使用 dplyrlubridate:

的普通 R 一样
df %>%
 mutate(Timestamp = mdy_hms(Timestamp)) %>%
 filter(Timestamp < mdy_hms('03/01/2015 00:00:00'))

但是我未能改变 DataFrame 的列,因为它是 S4 class Column 而不是向量。

第二种方法可能是将 DataFrame 注册为 table,然后使用 SparkSQL 来处理 timestamp 类型:

df <- createDataFrame(sqlContext, data.frame(ID = c(1,2,3),
                                             Timestamp=c('08/01/2014 11:18:30',
                                                         '01/01/2015 12:13:45',
                                                         '05/01/2015 14:17:33')))
registerTempTable(df, 'df')
head(sql(sqlContext, 'SELECT * FROM df WHERE Timestamp < "03/01/2015 00:00:00"'))

但是因为它仍然是一个字符串比较所以它会给出错误的结果。这样做的正确方法是什么?

Spark 1.6+

您应该能够使用 unix_timestamp 函数和标准 SQLContext:

ts <- unix_timestamp(df$Timestamp, 'MM/dd/yyyy HH:mm:ss') %>%
  cast("timestamp")

df %>% 
   where(ts <  cast(lit("2015-03-01 00:00:00"), "timestamp"))

火花 < 1.

这应该可以解决问题:

sqlContext <- sparkRHive.init(sc)

query <- "SELECT * FROM df
    WHERE unix_timestamp(Timestamp, 'MM/dd/yyyy HH:mm:ss') < 
          unix_timestamp('2015-03-01 00:00:00')" # yyyy-MM-dd HH:mm:ss 

df <- createDataFrame(sqlContext, ...)
registerTempTable(df, 'df')

head(sql(sqlContext, query))

##   ID           Timestamp
## 1  1 08/01/2014 11:18:30
## 2  2 01/01/2015 12:13:45

请注意,上下文的选择在这里很重要。由于 unix_timestamp 是一个 Hive UDF 标准 SQLContext 你在 SparkR 中默认得到的在这里是行不通的。