非常 slow/stuck hadoop streaming with R
Very slow/stuck hadoop streaming with R
我正在尝试使用 R 编写自定义 map-reduce。这是我的映射器函数:
#! /usr/bin/env Rscript
input <- file("stdin", "r")
while(length(line <- readLines(input, n=1, warn=FALSE)) > 0) {
# in case of empty lines
if(nchar(line) == 0) break
# split line into data
data = unlist(strsplit(line, ","))
# output scores with cat()
cat(data[2],"|",data[3],"|",data[4]
,"\t" # reduce key followed by tab
,paste(data[1],paste(unlist(data[5:length(data)]),collapse=","),sep = ",") # all other fields separated by commas
,"\n",sep='') # line break
}
close(input)
所以基本上 3 列的组合是我在这里的关键;其余列将有价值。一旦我获得属于单个 reducer 节点中特定键的所有数据,这些数据将由下面的 reducer 代码处理:
first_line <- TRUE
first_time <- TRUE
prev_id <- ""
input <- file("stdin", "r")
while(length(line <- readLines(input, n=1, warn=FALSE)) > 0) {
if(nchar(line) == 0) break
if(first_time == TRUE){
first_time = FALSE
next
}
id <- unlist(strsplit(line,"\t"))[1]
data0 <- unlist(strsplit(line,"\t"))[2]
data1 = data.frame(t(unlist(strsplit(data0, ","))),stringsAsFactors=FALSE)
colnames(data1) = c('ITEM_I','BOH','EOH','LATTD_I','LNGTD_I')
data1$DEPT = strsplit(id,"\|")[[1]][1]
data1$CLAS = strsplit(id,"\|")[[1]][2]
data1$SBCL = strsplit(id,"\|")[[1]][3]
if(prev_id==id | first_line==T){
if(!exists("base_data")){
base_data <- rbind(data1)
first_line <- F
}else{
base_data <- rbind(base_data,data1)
}
}else{
if(!exists("results")){
results <- BuildDTnProcess(base_data)
base_data <- rbind(data1)
}else{
results <- rbind(results,BuildDTnProcess(base_data))
base_data <- data1
}
}
prev_id <- id
}
close(input)
if(!exists("results")){
results <- BuildDTnProcess(base_data)
}else{
results <- rbind(results,BuildDTnProcess(base_data))
}
base_data <- NULL
所以我试图将属于单个键的所有记录堆积到一个数据框中(同时在出现新键时启动一个新的数据框)。然后将此数据传递给函数 BuildDTnProcess,该函数将执行一些操作以完成由单键观察组成的数据框;结果将存储在结果中。
我观察到此代码会卡住几天然后被终止。所以我已经开始一个一个地添加代码块来识别瓶颈。我已经确定,直到 data1$MDSE_SBCL_REF_I = strsplit(id,"\|")[[1]][3]
代码 运行 没问题,但是当我添加
if(prev_id==id | first_line==T){
if(!exists("base_data")){
base_data <- rbind(data1)
first_line <- F
}else{
base_data <- rbind(base_data,data1)
}
}
然后它变得很慢。在来自
的日志中(20 分钟内完成 运行)
2016-05-11 14:57:26,160 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=200000/0/0 in:1169=200000/171 [rec/s] out:0=0/171 [rec/s]
2016-05-11 14:58:47,346 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=300000/0/0 in:1185=300000/253 [rec/s] out:0=0/253 [rec/s]
2016-05-11 15:00:09,503 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=400000/0/0 in:1194=400000/335 [rec/s] out:0=0/335 [rec/s]
2016-05-11 15:01:33,969 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=500000/0/0 in:1193=500000/419 [rec/s] out:0=0/419 [rec/s]
2016-05-11 15:02:54,523 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=600000/0/0 in:1200=600000/500 [rec/s] out:0=0/500 [rec/s]
在
进行缓慢并卡住(甚至几天后仍未完成)
2016-05-11 13:51:17,543 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=10000/0/0 in:87=10000/114 [rec/s] out:0=0/114 [rec/s]
2016-05-11 16:58:16,552 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=100000/0/0 in:8=100000/11333 [rec/s] out:0=0/11333 [rec/s]
我在这里遗漏了什么重要的东西吗?
PS:在进行此分析时,我删除了下面提到的瓶颈代码块的所有代码部分。
自己写回答,找到原因了,这个问题至今没有任何回复,甚至没有评论。性能缓慢的根本原因是 "rbind" 操作。 Rbind 实现是这样一种方式,它需要更多时间来附加行;大基数 data.frame 比小基数 data.frame。有关此的更多详细信息,请参见此处 Growing a data.frame in a memory-efficient manner
我自己已经实施了 data.table 以及预填充版本的解决方案,效果非常好。
我正在尝试使用 R 编写自定义 map-reduce。这是我的映射器函数:
#! /usr/bin/env Rscript
input <- file("stdin", "r")
while(length(line <- readLines(input, n=1, warn=FALSE)) > 0) {
# in case of empty lines
if(nchar(line) == 0) break
# split line into data
data = unlist(strsplit(line, ","))
# output scores with cat()
cat(data[2],"|",data[3],"|",data[4]
,"\t" # reduce key followed by tab
,paste(data[1],paste(unlist(data[5:length(data)]),collapse=","),sep = ",") # all other fields separated by commas
,"\n",sep='') # line break
}
close(input)
所以基本上 3 列的组合是我在这里的关键;其余列将有价值。一旦我获得属于单个 reducer 节点中特定键的所有数据,这些数据将由下面的 reducer 代码处理:
first_line <- TRUE
first_time <- TRUE
prev_id <- ""
input <- file("stdin", "r")
while(length(line <- readLines(input, n=1, warn=FALSE)) > 0) {
if(nchar(line) == 0) break
if(first_time == TRUE){
first_time = FALSE
next
}
id <- unlist(strsplit(line,"\t"))[1]
data0 <- unlist(strsplit(line,"\t"))[2]
data1 = data.frame(t(unlist(strsplit(data0, ","))),stringsAsFactors=FALSE)
colnames(data1) = c('ITEM_I','BOH','EOH','LATTD_I','LNGTD_I')
data1$DEPT = strsplit(id,"\|")[[1]][1]
data1$CLAS = strsplit(id,"\|")[[1]][2]
data1$SBCL = strsplit(id,"\|")[[1]][3]
if(prev_id==id | first_line==T){
if(!exists("base_data")){
base_data <- rbind(data1)
first_line <- F
}else{
base_data <- rbind(base_data,data1)
}
}else{
if(!exists("results")){
results <- BuildDTnProcess(base_data)
base_data <- rbind(data1)
}else{
results <- rbind(results,BuildDTnProcess(base_data))
base_data <- data1
}
}
prev_id <- id
}
close(input)
if(!exists("results")){
results <- BuildDTnProcess(base_data)
}else{
results <- rbind(results,BuildDTnProcess(base_data))
}
base_data <- NULL
所以我试图将属于单个键的所有记录堆积到一个数据框中(同时在出现新键时启动一个新的数据框)。然后将此数据传递给函数 BuildDTnProcess,该函数将执行一些操作以完成由单键观察组成的数据框;结果将存储在结果中。
我观察到此代码会卡住几天然后被终止。所以我已经开始一个一个地添加代码块来识别瓶颈。我已经确定,直到 data1$MDSE_SBCL_REF_I = strsplit(id,"\|")[[1]][3]
代码 运行 没问题,但是当我添加
if(prev_id==id | first_line==T){
if(!exists("base_data")){
base_data <- rbind(data1)
first_line <- F
}else{
base_data <- rbind(base_data,data1)
}
}
然后它变得很慢。在来自
的日志中(20 分钟内完成 运行)2016-05-11 14:57:26,160 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=200000/0/0 in:1169=200000/171 [rec/s] out:0=0/171 [rec/s] 2016-05-11 14:58:47,346 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=300000/0/0 in:1185=300000/253 [rec/s] out:0=0/253 [rec/s] 2016-05-11 15:00:09,503 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=400000/0/0 in:1194=400000/335 [rec/s] out:0=0/335 [rec/s] 2016-05-11 15:01:33,969 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=500000/0/0 in:1193=500000/419 [rec/s] out:0=0/419 [rec/s] 2016-05-11 15:02:54,523 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=600000/0/0 in:1200=600000/500 [rec/s] out:0=0/500 [rec/s]
在
进行缓慢并卡住(甚至几天后仍未完成)2016-05-11 13:51:17,543 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=10000/0/0 in:87=10000/114 [rec/s] out:0=0/114 [rec/s] 2016-05-11 16:58:16,552 INFO [main] org.apache.hadoop.streaming.PipeMapRed: R/W/S=100000/0/0 in:8=100000/11333 [rec/s] out:0=0/11333 [rec/s]
我在这里遗漏了什么重要的东西吗?
PS:在进行此分析时,我删除了下面提到的瓶颈代码块的所有代码部分。
自己写回答,找到原因了,这个问题至今没有任何回复,甚至没有评论。性能缓慢的根本原因是 "rbind" 操作。 Rbind 实现是这样一种方式,它需要更多时间来附加行;大基数 data.frame 比小基数 data.frame。有关此的更多详细信息,请参见此处 Growing a data.frame in a memory-efficient manner
我自己已经实施了 data.table 以及预填充版本的解决方案,效果非常好。