R:为什么并行(多)慢?将并行用于(左)连接大量大文件的最佳策略是什么?
R: Why parallel is (much) slower? What is best strategy in using parallel for a (left) join a large collection of big files?
我已经阅读了一些关于这些主题的问题以及一些教程,但未能解决我的问题,所以决定问自己。
我收集了大量类型为 A、B、C 的大文件;在某些情况下,我需要离开加入 B、C 和 A。我在一台有 64 CPU 和 240GB 内存的远程服务器上工作,所以我很自然地想并行使用它的电源和进程。我掌握的一个重要知识是,如果 a_i 文件只能与 b_i、来自 B 的 b_(i+1) 成功连接,C 也一样。
我最初的尝试是为 'a_i' 文件设置一个 'join_i' 函数,然后并行地 运行 它(我有 448 个文件)。但是,时间没有显着改善,事实上,当我观察性能时 - 可悲的是,CPU 的加载百分比非常低。据我深入研究这个问题,我认为瓶颈是 IO,特别是因为所有文件都很大。这是一个有效的假设吗?
无论如何,在第二次尝试中,我决定按顺序检查每个文件,但在每次迭代中使用并行优势。然而,经过无数次尝试,我在这里也没有得到任何运气。我试图在下面做一个最小的例子,其中并行要慢得多(实际上在我的真实数据上它冻结了)。这里有什么问题?这是代码错误还是对 R 中的并行工作原理有更深层次的误解?另外,我尝试了一些 multidplyr 和 mclapply 但在这两种情况下都没有运气。
我还想指出,读取文件需要的不仅仅是连接本身:在 1 次迭代中读取大约需要 30 秒(我使用 fread,通过 cmd 在其中解压缩)而连接大约需要 10 秒。鉴于此,这里最好的策略是什么?
提前致谢!
library(dplyr)
A=data.frame(cbind('a', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)
B=data.frame(cbind('b', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)
C=data.frame(cbind('c', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)
chunk_join=function(i, A, B, C)
{
A_i=A %>% filter(X2==i)
B_i=B %>% filter(X2==i) %>% select(X1, X3)
C_i=C %>% filter(X2==i) %>% select(X1, X3)
join_i=A_i %>% left_join(B_i, by=c('X3')) %>% left_join(C_i, by=c('X3'))
}
library(parallel)
library(foreach)
cl = parallel::makeCluster(10)
doParallel::registerDoParallel(cl)
# not parallel
s1=Sys.time()
join1=data.frame()
join1 = foreach(j=1:10, .combine='rbind',
.packages=c('dplyr'),
.export=c('chunk_join','A', 'B', 'C')) %do%
{
join_i=chunk_join(j, A, B, C)
}
t1=Sys.time()-s1
colnames(join1)[4:5]=c('joinedB', 'joinedC')
r1=c(sum(!is.na(join1$joinedB)), sum(!is.na(join1$joinedC)))
# parallel
s2=Sys.time()
join2=data.frame()
join2 = foreach(j=1:10, .combine='rbind',
.packages=c('dplyr'),
.export=c('chunk_join','A', 'B', 'C')) %dopar%
{
join_i=chunk_join(j, A, B, C)
}
t2=Sys.time()-s2
stopCluster(cl)
colnames(join2)[4:5]=c('joinedB', 'joinedC')
r2=c(sum(!is.na(join2$joinedB)), sum(!is.na(join2$joinedC)))
R=rbind(r1, r2)
T=rbind(t1, t2)
R
T
在我的服务器上,%do% 大约为 5s,%dopar% 超过 1m。请注意,这是为了连接本身,甚至没有考虑创建集群的时间。
顺便说一句,有人也可以评论我应该有多少簇吗?比如说,我将数据划分为 X 个均匀大小的块,并且有 Y CPU 可用 - 我应该尽可能多地放置 Y - 还是 X,或其他一些簇?
您的多线程运行缓慢有两个问题:
1) 数据传输到新线程
2) 数据从新线程传回主线程
问题 #1 通过在 unix 系统上使用 mclapply
完全避免,除非数据被修改,否则不会复制数据。 (makeCluster
默认使用套接字传输数据)。
使用 mclapply
无法避免问题 #2,但您可以做的是尽量减少传输回主线程的数据量。
天真的mclapply:
join3 = mclapply(1:10, function(j) {
join_i=chunk_join(j, A, B, C)
}, mc.cores=4) %>% rbindlist
稍微聪明一点的mclapply:
chunk_join2=function(i, A, B, C)
{
A_i=A %>% filter(X2==i)
B_i=B %>% filter(X2==i) %>% select(X1, X3)
C_i=C %>% filter(X2==i) %>% select(X1, X3)
join_i=A_i %>% left_join(B_i, by=c('X3')) %>% left_join(C_i, by=c('X3'))
join_i[,c(-1,-2,-3)]
}
A <- arrange(A, X2)
join5 = mclapply(1:10, function(j) {
join_i=chunk_join2(j, A, B, C)
}, mc.cores=4) %>% rbindlist
join5 <- cbind(A, join5)
基准:
Single threaded: 4.014s
Naive mclapply: 1.860 s
Slightly smarter mclapply: 1.363 s
如果您的数据有很多列,您可以看到问题 #2 将如何使系统完全陷入困境。你可以做得更好,例如返回 B 和 C 的索引而不是整个 data.frame 子集。
我已经阅读了一些关于这些主题的问题以及一些教程,但未能解决我的问题,所以决定问自己。
我收集了大量类型为 A、B、C 的大文件;在某些情况下,我需要离开加入 B、C 和 A。我在一台有 64 CPU 和 240GB 内存的远程服务器上工作,所以我很自然地想并行使用它的电源和进程。我掌握的一个重要知识是,如果 a_i 文件只能与 b_i、来自 B 的 b_(i+1) 成功连接,C 也一样。 我最初的尝试是为 'a_i' 文件设置一个 'join_i' 函数,然后并行地 运行 它(我有 448 个文件)。但是,时间没有显着改善,事实上,当我观察性能时 - 可悲的是,CPU 的加载百分比非常低。据我深入研究这个问题,我认为瓶颈是 IO,特别是因为所有文件都很大。这是一个有效的假设吗? 无论如何,在第二次尝试中,我决定按顺序检查每个文件,但在每次迭代中使用并行优势。然而,经过无数次尝试,我在这里也没有得到任何运气。我试图在下面做一个最小的例子,其中并行要慢得多(实际上在我的真实数据上它冻结了)。这里有什么问题?这是代码错误还是对 R 中的并行工作原理有更深层次的误解?另外,我尝试了一些 multidplyr 和 mclapply 但在这两种情况下都没有运气。 我还想指出,读取文件需要的不仅仅是连接本身:在 1 次迭代中读取大约需要 30 秒(我使用 fread,通过 cmd 在其中解压缩)而连接大约需要 10 秒。鉴于此,这里最好的策略是什么? 提前致谢!
library(dplyr)
A=data.frame(cbind('a', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)
B=data.frame(cbind('b', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)
C=data.frame(cbind('c', c(1:10), sample(1:(2*10^6), 10^6, replace=F))) %>% mutate_all(as.character)
chunk_join=function(i, A, B, C)
{
A_i=A %>% filter(X2==i)
B_i=B %>% filter(X2==i) %>% select(X1, X3)
C_i=C %>% filter(X2==i) %>% select(X1, X3)
join_i=A_i %>% left_join(B_i, by=c('X3')) %>% left_join(C_i, by=c('X3'))
}
library(parallel)
library(foreach)
cl = parallel::makeCluster(10)
doParallel::registerDoParallel(cl)
# not parallel
s1=Sys.time()
join1=data.frame()
join1 = foreach(j=1:10, .combine='rbind',
.packages=c('dplyr'),
.export=c('chunk_join','A', 'B', 'C')) %do%
{
join_i=chunk_join(j, A, B, C)
}
t1=Sys.time()-s1
colnames(join1)[4:5]=c('joinedB', 'joinedC')
r1=c(sum(!is.na(join1$joinedB)), sum(!is.na(join1$joinedC)))
# parallel
s2=Sys.time()
join2=data.frame()
join2 = foreach(j=1:10, .combine='rbind',
.packages=c('dplyr'),
.export=c('chunk_join','A', 'B', 'C')) %dopar%
{
join_i=chunk_join(j, A, B, C)
}
t2=Sys.time()-s2
stopCluster(cl)
colnames(join2)[4:5]=c('joinedB', 'joinedC')
r2=c(sum(!is.na(join2$joinedB)), sum(!is.na(join2$joinedC)))
R=rbind(r1, r2)
T=rbind(t1, t2)
R
T
在我的服务器上,%do% 大约为 5s,%dopar% 超过 1m。请注意,这是为了连接本身,甚至没有考虑创建集群的时间。 顺便说一句,有人也可以评论我应该有多少簇吗?比如说,我将数据划分为 X 个均匀大小的块,并且有 Y CPU 可用 - 我应该尽可能多地放置 Y - 还是 X,或其他一些簇?
您的多线程运行缓慢有两个问题:
1) 数据传输到新线程 2) 数据从新线程传回主线程
问题 #1 通过在 unix 系统上使用 mclapply
完全避免,除非数据被修改,否则不会复制数据。 (makeCluster
默认使用套接字传输数据)。
使用 mclapply
无法避免问题 #2,但您可以做的是尽量减少传输回主线程的数据量。
天真的mclapply:
join3 = mclapply(1:10, function(j) {
join_i=chunk_join(j, A, B, C)
}, mc.cores=4) %>% rbindlist
稍微聪明一点的mclapply:
chunk_join2=function(i, A, B, C)
{
A_i=A %>% filter(X2==i)
B_i=B %>% filter(X2==i) %>% select(X1, X3)
C_i=C %>% filter(X2==i) %>% select(X1, X3)
join_i=A_i %>% left_join(B_i, by=c('X3')) %>% left_join(C_i, by=c('X3'))
join_i[,c(-1,-2,-3)]
}
A <- arrange(A, X2)
join5 = mclapply(1:10, function(j) {
join_i=chunk_join2(j, A, B, C)
}, mc.cores=4) %>% rbindlist
join5 <- cbind(A, join5)
基准:
Single threaded: 4.014s
Naive mclapply: 1.860 s
Slightly smarter mclapply: 1.363 s
如果您的数据有很多列,您可以看到问题 #2 将如何使系统完全陷入困境。你可以做得更好,例如返回 B 和 C 的索引而不是整个 data.frame 子集。