火花R。 SQL。使用时间戳计算滚动时间内满足条件的记录 window

SparkR. SQL. Count records satisfying criteria within rolling time window using timestamps

我有一个数据集,其结构类似于您从中获得的 df:

dates<- base::seq.POSIXt(from=as.POSIXlt(as.Date("2018-01-01"), 
format="%Y-%m-%d"), to=as.POSIXlt(as.Date("2018-01-03"), format="%Y-%m-%d"), by = "hour")
possible_statuses<- c('moving', 'stopped')
statuses4demo<- base::sample(possible_statuses, size=98, replace = TRUE, prob = c(.75, .25))
hours_back<- 5
hours_back_milliseconds<- hours_back*3600 * 1000

# Generate dataframe
df<- data.frame(date=rep(dates,2), user_id=c(rep("user_1", 49), rep("user_2", 49)), status=statuses4demo)
df$row_id<- seq(from=1,to=nrow(df), by=1)
df$eventTimestamp<- as.numeric(format(df$date, "%s"))*1000
df$hours_back_timestamp<- df$eventTimestamp - hours_back_milliseconds
df$num_stops_within_past_5_hours<- 0

我想获得一个数据框,其中包含每行状态为 "stopped" 的观察数量的滚动计数。为了在 R 中做到这一点,我只是做了几个嵌套循环,即 运行 this:

for(i in 1:length(unique(df$user_id))){
  the_user<- unique(df$user_id)[i]
  filtered_data<- df[which(df$user_id == the_user),]

  for(j in 1:nrow(filtered_data)){
    the_row_id<- filtered_data$row_id[j]
    the_time<- filtered_data$eventTimestamp[j]
    the_past_time<- filtered_data$hours_back_timestamp[j]
    num_stops_in_past_interval<- base::nrow(filtered_data[filtered_data$eventTimestamp >= the_past_time & filtered_data$eventTimestamp < the_time & filtered_data$status == "stopped",])
    df$num_stops_within_past_5_hours[which(df$row_id==the_row_id)]<- num_stops_in_past_interval
  }
}
View(df)

我正在尝试做同样的事情,但是要么使用 SparkR 中的内置函数,要么(我认为更有可能)使用 SQL 语句。我想知道是否有人知道如何在 Spark 上下文中重现 df 的输出?任何帮助深表感谢。先感谢您。 --内特

从这个数据开始:

sdf<- SparkR::createDataFrame(df[, c("date", "eventTimestamp", "status", "user_id", "row_id")])

此解决方案适用于您设置的示例数据,但对于具有任意时间戳的观察结果而言并不是更通用的解决方案。

ddf <- as.DataFrame(df)
ddf$count <- ifelse(ddf$status == "stopped", 1, 0)

# Create a windowSpec partitioning by user_id and ordered by date
ws <- orderBy(windowPartitionBy("user_id"), "date")

# Get the cumulative sum of the count variable by user id
ddf$count <- over(sum(ddf$count), ws)

# Get the lagged value of the cumulative sum from 5hrs ago
ddf$lag_count <- over(lag(ddf$count, offset = 5, default = 0), ws)

# The count of stops in the last 5hrs is the difference between the two
ddf$num_stops_within_past_5_hours <- ddf$count - ddf$lag_count

编辑以添加可以处理不一致时间中断的更通用的解决方案

# Using a sampled version of the original df to create inconsistent 
time breaks
ddf <- as.DataFrame(df[base::sample(nrow(df), nrow(df) - 20), ])
ddf$count <- ifelse(ddf$status == "stopped", 1, 0)

to_join <- ddf %>% select("count", "eventTimestamp", "user_id") %>% rename(eventTimestamp_ = .$eventTimestamp, user_id_ = .$user_id)

ddf$count <- NULL

# join in each row where the event timestamp is within the interval
ddf_new <- join(ddf, to_join, ddf$hours_back_timestamp <= to_join$eventTimestamp_ & ddf$eventTimestamp >= to_join$eventTimestamp_ & ddf$user_id == to_join$user_id_, joinType = "left")

ddf_new <- ddf_new %>% groupBy(
   'date', 
   'eventTimestamp',
   'user_id', 
   'status', 
   'row_id', 
   'hours_back_timestamp') %>% 
   agg(num_stops_within_past_5_hours = sum(ddf_new$count))