R用并行代码替换数据框中的值

R replacing values in dataframe, with paralellised code

我有一个结构如下的大型数据框:

vals
  idx v
1   1 3
2   2 2
3   3 0
4   4 2
5   5 0
6   6 0
7   7 0
.
.
.

我需要通过以下方式将这个数据框的内容放入一个csv文件中: 我需要按以下步骤遍历 'idx' 列,例如 2,并且每隔一个 idx 值,需要相应行中的 'v' 值和下一个 2 'v'低于此值。

因此采用上述示例数据框的前 7 行:

> d=data.frame()
> temp=seq(vals[1,1],vals[nrow(vals),1]-1,2)
> for(i in temp){d=rbind(d,c(vals[which(vals[,1]==i)[1],1],vals[which(vals[,1]>=i & vals[,1]<=i+2),2]))}
> d
  X1 X3 X2 X0
1  1  3  2  0
2  3  0  2  0
3  5  0  0  0

上面的代码给了我想要的。然而,实际上我正在使用的 'vals' 数据框非常大,这需要花费不可行的时间来处理...... 我正在尝试为上述代码的并行化版本找到一个可行的解决方案:

> d=data.frame()
> temp=seq(vals[1,1],vals[nrow(vals),1]-1,2)
> put_it=function(i){d=rbind(d,c(vals[which(vals[,1]==i)[1],1],vals[which(vals[,1]>=i & vals[,1]<=i+2),2]))}
> mclapply(temp,put_it,mc.cores = detectCores()
[[1]]
  X1 X3 X2 X0
1  1  3  2  0

[[2]]
  X3 X0 X2 X0.1
1  3  0  2    0

[[3]]
  X5 X0 X0.1 X0.2
1  5  0    0    0

因此每次都会重置 'd' 数据帧,这并没有给我我想要的 - 因为我需要所有数据都在同一个数据帧中。

我还考虑在每次迭代完成时将数据作为新行写入文件:

temp=seq(vals[1,1],vals[nrow(vals),1]-1,2)
put_it=function(i){cat(vals[which(vals[,1]==i)[1],1],
         ',',paste(vals[which(vals[,1]>=i & vals[,1]<=i+10000),2],
          sep=' '),'\n',sep=' ',append=T,
           file='~/files/test.csv')}
mclapply(temp,put_it,mc.cores = detectCores())

请注意,这次我添加了 10000 个向量,而不仅仅是接下来的 2 个值 但是,当同时执行 2 个作业并且我得到一个包含多个新行的文件时,这会在其他行的中间开始时出现问题:

 [middle of a row]........0 0 0 0 01  0,  00  00  00  00  0 0 0 0 0 0 0 .....

此任务不需要循环,可以使用矢量化方法。您只需要创建一个序列来指定应从中提取值的行。下面是一个您可能会采用的简短示例。我希望我能正确理解你的问题,这就是你需要的输出。让我知道这是否适合你。

通过示例更新了答案 foreach 强调对于给定示例 不需要并行化。刚刚添加的 foreach 示例是为了展示如何执行并行化的一种可能方式。 (请注意,belwo 示例在分块等方面不是故障安全的,对于并行化期间更复杂的引用,您需要考虑如何拆分数据和生成引用)。

set.seed(0)
data <- data.frame(idx = 1:10, val = sample(101:110, 10))
#    idx val
# 1    1 109
# 2    2 103
# 3    3 110
# 4    4 105
# 5    5 106
# 6    6 102
# 7    7 104
# 8    8 108
# 9    9 107
# 10  10 101
#specify which rows shall be used for extraction
extract <- seq(from = 2, to = nrow(data), by = 2)
#[1] 2  4  6  8 10
#to get, e.g., entries of each following row simply add +1 to the extraction sequence
#and so on +2/+3, etc. for additional entries
data_extracted <- cbind(X1 = data[extract, "val" ], X2 = data[extract+1, "val"])
data_extracted
#       X1  X2
# [1,] 103 110
# [2,] 105 106
# [3,] 102 104
# [4,] 108 107
# [5,] 101  NA  

#parallel version with foreach
#certainly not the most elegant approach and not failsafe concerning chunking/splitting
library(foreach)
library(parallel)
library(doParallel)

n_cores <- 2

data_rows <- 1:nrow(data)
chunk_size <- nrow(data)/n_cores
#chunking solution from here: 
chunk_rows <- split(data_rows,
                     ceiling(seq_along(data_rows)/(chunk_size))
                     )

chunk_ext <- split(extract, c(rep(1:length(chunk_rows), each = floor(chunk_size/2)), length(chunk_rows)))

cluster <- parallel::makeCluster(n_cores)
doParallel::registerDoParallel(cluster)

data_extracted_parallel <- foreach(j = 1:length(chunk_rows)
        ,.combine = rbind) %dopar% {
          chunk_dat <- data[chunk_rows[[j]], ]
          chunk_ext <-  chunk_ext[[j]]
          chunk_ext  <- which( chunk_dat$idx %in% chunk_ext)
          cbind(X1 = chunk_dat[ chunk_ext, "val" ], X2 =  chunk_dat[chunk_ext+1, "val"])
}

stopCluster(cluster)

all.equal(data_extracted_parallel, data_extracted)
#[1] TRUE