R用并行代码替换数据框中的值
R replacing values in dataframe, with paralellised code
我有一个结构如下的大型数据框:
vals
idx v
1 1 3
2 2 2
3 3 0
4 4 2
5 5 0
6 6 0
7 7 0
.
.
.
我需要通过以下方式将这个数据框的内容放入一个csv文件中:
我需要按以下步骤遍历 'idx' 列,例如 2,并且每隔一个 idx 值,需要相应行中的 'v' 值和下一个 2 'v'低于此值。
因此采用上述示例数据框的前 7 行:
> d=data.frame()
> temp=seq(vals[1,1],vals[nrow(vals),1]-1,2)
> for(i in temp){d=rbind(d,c(vals[which(vals[,1]==i)[1],1],vals[which(vals[,1]>=i & vals[,1]<=i+2),2]))}
> d
X1 X3 X2 X0
1 1 3 2 0
2 3 0 2 0
3 5 0 0 0
上面的代码给了我想要的。然而,实际上我正在使用的 'vals' 数据框非常大,这需要花费不可行的时间来处理......
我正在尝试为上述代码的并行化版本找到一个可行的解决方案:
> d=data.frame()
> temp=seq(vals[1,1],vals[nrow(vals),1]-1,2)
> put_it=function(i){d=rbind(d,c(vals[which(vals[,1]==i)[1],1],vals[which(vals[,1]>=i & vals[,1]<=i+2),2]))}
> mclapply(temp,put_it,mc.cores = detectCores()
[[1]]
X1 X3 X2 X0
1 1 3 2 0
[[2]]
X3 X0 X2 X0.1
1 3 0 2 0
[[3]]
X5 X0 X0.1 X0.2
1 5 0 0 0
因此每次都会重置 'd' 数据帧,这并没有给我我想要的 - 因为我需要所有数据都在同一个数据帧中。
我还考虑在每次迭代完成时将数据作为新行写入文件:
temp=seq(vals[1,1],vals[nrow(vals),1]-1,2)
put_it=function(i){cat(vals[which(vals[,1]==i)[1],1],
',',paste(vals[which(vals[,1]>=i & vals[,1]<=i+10000),2],
sep=' '),'\n',sep=' ',append=T,
file='~/files/test.csv')}
mclapply(temp,put_it,mc.cores = detectCores())
请注意,这次我添加了 10000 个向量,而不仅仅是接下来的 2 个值
但是,当同时执行 2 个作业并且我得到一个包含多个新行的文件时,这会在其他行的中间开始时出现问题:
[middle of a row]........0 0 0 0 01 0, 00 00 00 00 0 0 0 0 0 0 0 .....
此任务不需要循环,可以使用矢量化方法。您只需要创建一个序列来指定应从中提取值的行。下面是一个您可能会采用的简短示例。我希望我能正确理解你的问题,这就是你需要的输出。让我知道这是否适合你。
通过示例更新了答案 foreach
强调对于给定示例 不需要并行化。刚刚添加的 foreach 示例是为了展示如何执行并行化的一种可能方式。 (请注意,belwo 示例在分块等方面不是故障安全的,对于并行化期间更复杂的引用,您需要考虑如何拆分数据和生成引用)。
set.seed(0)
data <- data.frame(idx = 1:10, val = sample(101:110, 10))
# idx val
# 1 1 109
# 2 2 103
# 3 3 110
# 4 4 105
# 5 5 106
# 6 6 102
# 7 7 104
# 8 8 108
# 9 9 107
# 10 10 101
#specify which rows shall be used for extraction
extract <- seq(from = 2, to = nrow(data), by = 2)
#[1] 2 4 6 8 10
#to get, e.g., entries of each following row simply add +1 to the extraction sequence
#and so on +2/+3, etc. for additional entries
data_extracted <- cbind(X1 = data[extract, "val" ], X2 = data[extract+1, "val"])
data_extracted
# X1 X2
# [1,] 103 110
# [2,] 105 106
# [3,] 102 104
# [4,] 108 107
# [5,] 101 NA
#parallel version with foreach
#certainly not the most elegant approach and not failsafe concerning chunking/splitting
library(foreach)
library(parallel)
library(doParallel)
n_cores <- 2
data_rows <- 1:nrow(data)
chunk_size <- nrow(data)/n_cores
#chunking solution from here:
chunk_rows <- split(data_rows,
ceiling(seq_along(data_rows)/(chunk_size))
)
chunk_ext <- split(extract, c(rep(1:length(chunk_rows), each = floor(chunk_size/2)), length(chunk_rows)))
cluster <- parallel::makeCluster(n_cores)
doParallel::registerDoParallel(cluster)
data_extracted_parallel <- foreach(j = 1:length(chunk_rows)
,.combine = rbind) %dopar% {
chunk_dat <- data[chunk_rows[[j]], ]
chunk_ext <- chunk_ext[[j]]
chunk_ext <- which( chunk_dat$idx %in% chunk_ext)
cbind(X1 = chunk_dat[ chunk_ext, "val" ], X2 = chunk_dat[chunk_ext+1, "val"])
}
stopCluster(cluster)
all.equal(data_extracted_parallel, data_extracted)
#[1] TRUE
我有一个结构如下的大型数据框:
vals
idx v
1 1 3
2 2 2
3 3 0
4 4 2
5 5 0
6 6 0
7 7 0
.
.
.
我需要通过以下方式将这个数据框的内容放入一个csv文件中: 我需要按以下步骤遍历 'idx' 列,例如 2,并且每隔一个 idx 值,需要相应行中的 'v' 值和下一个 2 'v'低于此值。
因此采用上述示例数据框的前 7 行:
> d=data.frame()
> temp=seq(vals[1,1],vals[nrow(vals),1]-1,2)
> for(i in temp){d=rbind(d,c(vals[which(vals[,1]==i)[1],1],vals[which(vals[,1]>=i & vals[,1]<=i+2),2]))}
> d
X1 X3 X2 X0
1 1 3 2 0
2 3 0 2 0
3 5 0 0 0
上面的代码给了我想要的。然而,实际上我正在使用的 'vals' 数据框非常大,这需要花费不可行的时间来处理...... 我正在尝试为上述代码的并行化版本找到一个可行的解决方案:
> d=data.frame()
> temp=seq(vals[1,1],vals[nrow(vals),1]-1,2)
> put_it=function(i){d=rbind(d,c(vals[which(vals[,1]==i)[1],1],vals[which(vals[,1]>=i & vals[,1]<=i+2),2]))}
> mclapply(temp,put_it,mc.cores = detectCores()
[[1]]
X1 X3 X2 X0
1 1 3 2 0
[[2]]
X3 X0 X2 X0.1
1 3 0 2 0
[[3]]
X5 X0 X0.1 X0.2
1 5 0 0 0
因此每次都会重置 'd' 数据帧,这并没有给我我想要的 - 因为我需要所有数据都在同一个数据帧中。
我还考虑在每次迭代完成时将数据作为新行写入文件:
temp=seq(vals[1,1],vals[nrow(vals),1]-1,2)
put_it=function(i){cat(vals[which(vals[,1]==i)[1],1],
',',paste(vals[which(vals[,1]>=i & vals[,1]<=i+10000),2],
sep=' '),'\n',sep=' ',append=T,
file='~/files/test.csv')}
mclapply(temp,put_it,mc.cores = detectCores())
请注意,这次我添加了 10000 个向量,而不仅仅是接下来的 2 个值 但是,当同时执行 2 个作业并且我得到一个包含多个新行的文件时,这会在其他行的中间开始时出现问题:
[middle of a row]........0 0 0 0 01 0, 00 00 00 00 0 0 0 0 0 0 0 .....
此任务不需要循环,可以使用矢量化方法。您只需要创建一个序列来指定应从中提取值的行。下面是一个您可能会采用的简短示例。我希望我能正确理解你的问题,这就是你需要的输出。让我知道这是否适合你。
通过示例更新了答案 foreach
强调对于给定示例 不需要并行化。刚刚添加的 foreach 示例是为了展示如何执行并行化的一种可能方式。 (请注意,belwo 示例在分块等方面不是故障安全的,对于并行化期间更复杂的引用,您需要考虑如何拆分数据和生成引用)。
set.seed(0)
data <- data.frame(idx = 1:10, val = sample(101:110, 10))
# idx val
# 1 1 109
# 2 2 103
# 3 3 110
# 4 4 105
# 5 5 106
# 6 6 102
# 7 7 104
# 8 8 108
# 9 9 107
# 10 10 101
#specify which rows shall be used for extraction
extract <- seq(from = 2, to = nrow(data), by = 2)
#[1] 2 4 6 8 10
#to get, e.g., entries of each following row simply add +1 to the extraction sequence
#and so on +2/+3, etc. for additional entries
data_extracted <- cbind(X1 = data[extract, "val" ], X2 = data[extract+1, "val"])
data_extracted
# X1 X2
# [1,] 103 110
# [2,] 105 106
# [3,] 102 104
# [4,] 108 107
# [5,] 101 NA
#parallel version with foreach
#certainly not the most elegant approach and not failsafe concerning chunking/splitting
library(foreach)
library(parallel)
library(doParallel)
n_cores <- 2
data_rows <- 1:nrow(data)
chunk_size <- nrow(data)/n_cores
#chunking solution from here:
chunk_rows <- split(data_rows,
ceiling(seq_along(data_rows)/(chunk_size))
)
chunk_ext <- split(extract, c(rep(1:length(chunk_rows), each = floor(chunk_size/2)), length(chunk_rows)))
cluster <- parallel::makeCluster(n_cores)
doParallel::registerDoParallel(cluster)
data_extracted_parallel <- foreach(j = 1:length(chunk_rows)
,.combine = rbind) %dopar% {
chunk_dat <- data[chunk_rows[[j]], ]
chunk_ext <- chunk_ext[[j]]
chunk_ext <- which( chunk_dat$idx %in% chunk_ext)
cbind(X1 = chunk_dat[ chunk_ext, "val" ], X2 = chunk_dat[chunk_ext+1, "val"])
}
stopCluster(cluster)
all.equal(data_extracted_parallel, data_extracted)
#[1] TRUE