使用 RevoScaleR 每组行数
Number rows per group with RevoScaleR
我正在转换本地 R 脚本以使用 Revolution-R(又名 Microsoft R Client/Server)包中的 RevoScaleR
函数。这样可以更好地扩展大量数据。
目标是创建一个新列,为每个组的行编号。使用 data.table
这将通过以下代码实现:
library(data.table)
eventlog[,ActivityNumber := seq(from=1, to=.N, by=1), by=Case.ID]
出于说明目的,输出如下所示:
Case.ID ActivityNumber
1 A 1
2 A 2
3 B 1
4 C 1
5 C 2
6 C 3
在使用 rx
函数进行一些研究后,我找到了包 dplyrXdf
,它基本上是一个包装器,用于在 Xdf
stored 上使用 dplyr
函数数据,同时仍受益于 RevoScaleR
的优化功能(参见 http://blog.revolutionanalytics.com/2015/10/using-the-dplyrxdf-package.html)
在我的例子中,这将导致以下结果:
result <- eventlog %>%
group_by(Case.ID) %>%
mutate(ActivityNumber = seq_len(n()))
但是,这会导致以下错误:
ERROR: Attempting to add a variable without a name to an analysis.
Caught exception in file: CxAnalysis.cpp, line: 3756. ThreadID: 1248 Rethrowing.
Caught exception in file: CxAnalysis.cpp, line: 5249. ThreadID: 1248 Rethrowing.
Error in doTryCatch(return(expr), name, parentenv, handler) :
Error in executing R code: ERROR: Attempting to add a variable without a name to an analysis.
有什么办法可以解决这个错误吗?或者其他(更好的?)方法来获得请求的结果?
dplyr
和 dplyrXdf
有一个 tally
方法来计算每组的项目:
result <- eventlog %>%
group_by(Case.ID) %>%
tally()
如果您想做的不仅仅是将每组的记录制成表格,您可以使用汇总(因为您没有显示数据,我使用了一个名为 delay 的假设列,我假设它是数字用于说明目的):
result <- eventlog %>%
group_by(Case.ID) %>%
summarize(counts = n(),
ave_delay = mean(delay))
您可以使用常规 RevoScaleR
函数执行上述操作,
rxCrossTabs(~ Case.ID, data = eventlog)
第二个例子:
rxCube(delay ~ Case.ID, data = eventlog)
我不确定为什么会这样,但请尝试使用 seq_along(Case.ID)
而不是 seq_len(n())
:
result <- eventlog %>%
group_by(Case.ID) %>%
mutate(ActivityNumber = seq_along(Case.ID))
n()
似乎有问题。这是我的探索性代码,以防其他人想要试验:
options(stringsAsFactors = FALSE)
library(dplyrXdf)
# Set up some test data
eventlog_df <- data.frame(Case.ID = c("A", "A", "A", "A", "A", "B", "C", "C", "C"))
# Add a variable for artificially splitting the XDF into small chunks
eventlog_df$Chunk.ID <- factor((seq_len(nrow(eventlog_df)) + 2) %/% 3)
# Check the results
eventlog_df
# Now read it into an XDF file. I'm going to read just three rows in at a time
# so that the XDF file has several chunks, so we can be confident this works
# across chunks
eventlog <- tempfile(fileext = ".xdf")
for(i in 1:3) {
rxImport(inData = eventlog_df[eventlog_df$Chunk.ID %in% i, ],
outFile = eventlog,
colInfo = list(Case.ID = list(type = "factor",
levels = c("A", "B", "C"))),
append = file.exists(eventlog))
}
# Convert to a proper data source
eventlog <- RxXdfData(eventlog)
rxGetInfo(eventlog, getVarInfo = TRUE, numRows = 10)
# Now to dplyr. First, let's make sure it can count up the records
# in each group without any trouble.
result <- eventlog %>%
group_by(Case.ID) %>%
summarise(ActivityNumber = n())
# It can:
rxDataStep(result)
# Now if we switch to mutate, does n() still work?
result <- eventlog %>%
group_by(Case.ID) %>%
mutate(ActivityNumber = n())
# No - and it seems to be complaining about missing variables. So what if
# we try to refer to a variable we *know* exists?
result <- eventlog %>%
group_by(Case.ID) %>%
mutate(ActivityNumber = seq_along(Case.ID))
# It works
rxDataStep(result)
感谢@Matt-parker 指出我这个问题。
请注意,n()
不是常规的 R 函数,尽管它看起来像一个。它需要为每个数据源专门实现,也可能分别为每个 mutate
、summarise
和 filter
.
目前,xdf 文件支持的 n
的唯一用法是在 summarise
内,以计算行数。为其他动词实施它实际上很重要。
特别是 Matt 使用 seq_along
来实现 n
的功能时存在问题。请记住,xdf 文件是块结构的:每个行块都独立于其他块读取和处理。这意味着生成的序列 仅针对该行块 ,而 而不是 对组中的所有行。如果一组跨越多个块,序列号将在中间重新开始。
获得正确序列号的方法是记录 运行 您为该组读入的行数,并在每次处理一个块时更新它。您可以使用 transformFunc
执行此操作,您通过 .rxArgs
参数将其传递给 transmute
:
ev <- eventlog %>% group_by(Case.ID) %>% transmute(.rxArgs = list(
transformFunc = function(varList) {
n <- .n + seq_along(varList[[1]])
if(!.rxIsTestChunk) # need this b/c rxDataStep does a test run on the 1st 10 rows
.n <<- n[length(n)]
list(n=n)
},
transformObjects = list(.n = 0))
这应该适用于 local
、localpar
和 foreach
计算上下文。在您无法保证 rxDataStep 将以确定的顺序处理行的任何上下文中,它可能不起作用(或者至少不会给出可重现的结果)——因此 Mapreduce、Spark、Teradata 或类似的。
我正在转换本地 R 脚本以使用 Revolution-R(又名 Microsoft R Client/Server)包中的 RevoScaleR
函数。这样可以更好地扩展大量数据。
目标是创建一个新列,为每个组的行编号。使用 data.table
这将通过以下代码实现:
library(data.table)
eventlog[,ActivityNumber := seq(from=1, to=.N, by=1), by=Case.ID]
出于说明目的,输出如下所示:
Case.ID ActivityNumber
1 A 1
2 A 2
3 B 1
4 C 1
5 C 2
6 C 3
在使用 rx
函数进行一些研究后,我找到了包 dplyrXdf
,它基本上是一个包装器,用于在 Xdf
stored 上使用 dplyr
函数数据,同时仍受益于 RevoScaleR
的优化功能(参见 http://blog.revolutionanalytics.com/2015/10/using-the-dplyrxdf-package.html)
在我的例子中,这将导致以下结果:
result <- eventlog %>%
group_by(Case.ID) %>%
mutate(ActivityNumber = seq_len(n()))
但是,这会导致以下错误:
ERROR: Attempting to add a variable without a name to an analysis.
Caught exception in file: CxAnalysis.cpp, line: 3756. ThreadID: 1248 Rethrowing.
Caught exception in file: CxAnalysis.cpp, line: 5249. ThreadID: 1248 Rethrowing.
Error in doTryCatch(return(expr), name, parentenv, handler) :
Error in executing R code: ERROR: Attempting to add a variable without a name to an analysis.
有什么办法可以解决这个错误吗?或者其他(更好的?)方法来获得请求的结果?
dplyr
和 dplyrXdf
有一个 tally
方法来计算每组的项目:
result <- eventlog %>%
group_by(Case.ID) %>%
tally()
如果您想做的不仅仅是将每组的记录制成表格,您可以使用汇总(因为您没有显示数据,我使用了一个名为 delay 的假设列,我假设它是数字用于说明目的):
result <- eventlog %>%
group_by(Case.ID) %>%
summarize(counts = n(),
ave_delay = mean(delay))
您可以使用常规 RevoScaleR
函数执行上述操作,
rxCrossTabs(~ Case.ID, data = eventlog)
第二个例子:
rxCube(delay ~ Case.ID, data = eventlog)
我不确定为什么会这样,但请尝试使用 seq_along(Case.ID)
而不是 seq_len(n())
:
result <- eventlog %>%
group_by(Case.ID) %>%
mutate(ActivityNumber = seq_along(Case.ID))
n()
似乎有问题。这是我的探索性代码,以防其他人想要试验:
options(stringsAsFactors = FALSE)
library(dplyrXdf)
# Set up some test data
eventlog_df <- data.frame(Case.ID = c("A", "A", "A", "A", "A", "B", "C", "C", "C"))
# Add a variable for artificially splitting the XDF into small chunks
eventlog_df$Chunk.ID <- factor((seq_len(nrow(eventlog_df)) + 2) %/% 3)
# Check the results
eventlog_df
# Now read it into an XDF file. I'm going to read just three rows in at a time
# so that the XDF file has several chunks, so we can be confident this works
# across chunks
eventlog <- tempfile(fileext = ".xdf")
for(i in 1:3) {
rxImport(inData = eventlog_df[eventlog_df$Chunk.ID %in% i, ],
outFile = eventlog,
colInfo = list(Case.ID = list(type = "factor",
levels = c("A", "B", "C"))),
append = file.exists(eventlog))
}
# Convert to a proper data source
eventlog <- RxXdfData(eventlog)
rxGetInfo(eventlog, getVarInfo = TRUE, numRows = 10)
# Now to dplyr. First, let's make sure it can count up the records
# in each group without any trouble.
result <- eventlog %>%
group_by(Case.ID) %>%
summarise(ActivityNumber = n())
# It can:
rxDataStep(result)
# Now if we switch to mutate, does n() still work?
result <- eventlog %>%
group_by(Case.ID) %>%
mutate(ActivityNumber = n())
# No - and it seems to be complaining about missing variables. So what if
# we try to refer to a variable we *know* exists?
result <- eventlog %>%
group_by(Case.ID) %>%
mutate(ActivityNumber = seq_along(Case.ID))
# It works
rxDataStep(result)
感谢@Matt-parker 指出我这个问题。
请注意,n()
不是常规的 R 函数,尽管它看起来像一个。它需要为每个数据源专门实现,也可能分别为每个 mutate
、summarise
和 filter
.
目前,xdf 文件支持的 n
的唯一用法是在 summarise
内,以计算行数。为其他动词实施它实际上很重要。
特别是 Matt 使用 seq_along
来实现 n
的功能时存在问题。请记住,xdf 文件是块结构的:每个行块都独立于其他块读取和处理。这意味着生成的序列 仅针对该行块 ,而 而不是 对组中的所有行。如果一组跨越多个块,序列号将在中间重新开始。
获得正确序列号的方法是记录 运行 您为该组读入的行数,并在每次处理一个块时更新它。您可以使用 transformFunc
执行此操作,您通过 .rxArgs
参数将其传递给 transmute
:
ev <- eventlog %>% group_by(Case.ID) %>% transmute(.rxArgs = list(
transformFunc = function(varList) {
n <- .n + seq_along(varList[[1]])
if(!.rxIsTestChunk) # need this b/c rxDataStep does a test run on the 1st 10 rows
.n <<- n[length(n)]
list(n=n)
},
transformObjects = list(.n = 0))
这应该适用于 local
、localpar
和 foreach
计算上下文。在您无法保证 rxDataStep 将以确定的顺序处理行的任何上下文中,它可能不起作用(或者至少不会给出可重现的结果)——因此 Mapreduce、Spark、Teradata 或类似的。