使用 RevoScaleR 每组行数

Number rows per group with RevoScaleR

我正在转换本地 R 脚本以使用 Revolution-R(又名 Microsoft R Client/Server)包中的 RevoScaleR 函数。这样可以更好地扩展大量数据。

目标是创建一个新列,为每个组的行编号。使用 data.table 这将通过以下代码实现:

library(data.table)
eventlog[,ActivityNumber := seq(from=1, to=.N, by=1), by=Case.ID]

出于说明目的,输出如下所示:

    Case.ID    ActivityNumber
1       A              1
2       A              2
3       B              1
4       C              1
5       C              2
6       C              3

在使用 rx 函数进行一些研究后,我找到了包 dplyrXdf,它基本上是一个包装器,用于在 Xdfstored 上使用 dplyr 函数数据,同时仍受益于 RevoScaleR 的优化功能(参见 http://blog.revolutionanalytics.com/2015/10/using-the-dplyrxdf-package.html

在我的例子中,这将导致以下结果:

result <- eventlog %>%
  group_by(Case.ID) %>%
  mutate(ActivityNumber = seq_len(n()))

但是,这会导致以下错误:

ERROR: Attempting to add a variable without a name to an analysis.
Caught exception in file: CxAnalysis.cpp, line: 3756. ThreadID: 1248 Rethrowing.
Caught exception in file: CxAnalysis.cpp, line: 5249. ThreadID: 1248 Rethrowing.
Error in doTryCatch(return(expr), name, parentenv, handler) : 
  Error in executing R code: ERROR: Attempting to add a variable without a name to an analysis.

有什么办法可以解决这个错误吗?或者其他(更好的?)方法来获得请求的结果?

dplyrdplyrXdf 有一个 tally 方法来计算每组的项目:

result <- eventlog %>%
  group_by(Case.ID) %>%
  tally()

如果您想做的不仅仅是将每组的记录制成表格,您可以使用汇总(因为您没有显示数据,我使用了一个名为 delay 的假设列,我假设它是数字用于说明目的):

result <- eventlog %>%
  group_by(Case.ID) %>%
  summarize(counts = n(),
            ave_delay = mean(delay))

您可以使用常规 RevoScaleR 函数执行上述操作,

rxCrossTabs(~ Case.ID, data = eventlog)

第二个例子:

rxCube(delay ~ Case.ID, data = eventlog)

我不确定为什么会这样,但请尝试使用 seq_along(Case.ID) 而不是 seq_len(n()):

result <- eventlog %>%
  group_by(Case.ID) %>%
  mutate(ActivityNumber = seq_along(Case.ID))

n() 似乎有问题。这是我的探索性代码,以防其他人想要试验:

options(stringsAsFactors = FALSE)

library(dplyrXdf)

# Set up some test data
eventlog_df <- data.frame(Case.ID = c("A", "A", "A", "A", "A", "B", "C", "C", "C"))

# Add a variable for artificially splitting the XDF into small chunks
eventlog_df$Chunk.ID <- factor((seq_len(nrow(eventlog_df)) + 2) %/% 3)

# Check the results
eventlog_df


# Now read it into an XDF file. I'm going to read just three rows in at a time
# so that the XDF file has several chunks, so we can be confident this works
# across chunks

eventlog <- tempfile(fileext = ".xdf")

for(i in 1:3) {
    rxImport(inData = eventlog_df[eventlog_df$Chunk.ID %in% i, ],
             outFile = eventlog,
             colInfo = list(Case.ID = list(type = "factor", 
                                           levels = c("A", "B", "C"))),
             append = file.exists(eventlog))
}

# Convert to a proper data source
eventlog <- RxXdfData(eventlog)

rxGetInfo(eventlog, getVarInfo = TRUE, numRows = 10)


# Now to dplyr. First, let's make sure it can count up the records
# in each group without any trouble.
result <- eventlog %>%
  group_by(Case.ID) %>%
  summarise(ActivityNumber = n())

# It can:
rxDataStep(result)


# Now if we switch to mutate, does n() still work?
result <- eventlog %>%
  group_by(Case.ID) %>%
  mutate(ActivityNumber = n())

# No - and it seems to be complaining about missing variables. So what if
# we try to refer to a variable we *know* exists?
result <- eventlog %>%
  group_by(Case.ID) %>%
  mutate(ActivityNumber = seq_along(Case.ID))

# It works
rxDataStep(result)

感谢@Matt-parker 指出我这个问题。

请注意,n() 不是常规的 R 函数,尽管它看起来像一个。它需要为每个数据源专门实现,也可能分别为每个 mutatesummarisefilter.

目前,xdf 文件支持的 n 的唯一用法是在 summarise 内,以计算行数。为其他动词实施它实际上很重要。

特别是 Matt 使用 seq_along 来实现 n 的功能时存在问题。请记住,xdf 文件是块结构的:每个行块都独立于其他块读取和处理。这意味着生成的序列 仅针对该行块 ,而 而不是 对组中的所有行。如果一组跨越多个块,序列号将在中间重新开始。

获得正确序列号的方法是记录 运行 您为该组读入的行数,并在每次处理一个块时更新它。您可以使用 transformFunc 执行此操作,您通过 .rxArgs 参数将其传递给 transmute

ev <- eventlog %>% group_by(Case.ID) %>% transmute(.rxArgs = list(
    transformFunc = function(varList) {
        n <- .n + seq_along(varList[[1]])
        if(!.rxIsTestChunk)  # need this b/c rxDataStep does a test run on the 1st 10 rows
            .n <<- n[length(n)]
        list(n=n)
    },
    transformObjects = list(.n = 0))

这应该适用于 locallocalparforeach 计算上下文。在您无法保证 rxDataStep 将以确定的顺序处理行的任何上下文中,它可能不起作用(或者至少不会给出可重现的结果)——因此 Mapreduce、Spark、Teradata 或类似的。