如何在 Spark 中训练具有稀疏矩阵的随机森林?
How can I train a random forest with a sparse matrix in Spark?
考虑这个使用 sparklyr
:
的简单示例
library(sparklyr)
library(janeaustenr) # to get some text data
library(stringr)
library(dplyr)
mytext <- austen_books() %>%
mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable
mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)
# Source: table<mytext_spark> [?? x 3]
# Database: spark_connection
text book label
<chr> <chr> <int>
1 SENSE AND SENSIBILITY Sense & Sensibility 0
2 "" Sense & Sensibility 0
3 by Jane Austen Sense & Sensibility 0
4 "" Sense & Sensibility 0
5 (1811) Sense & Sensibility 0
6 "" Sense & Sensibility 0
7 "" Sense & Sensibility 0
8 "" Sense & Sensibility 0
9 "" Sense & Sensibility 0
10 CHAPTER 1 Sense & Sensibility 0
11 "" Sense & Sensibility 0
12 "" Sense & Sensibility 0
13 The family of Dashwood had long been settled in Sussex. Their estate Sense & Sensibility 0
14 was large, and their residence was at Norland Park, in the centre of Sense & Sensibility 0
15 their property, where, for many generations, they had lived in so Sense & Sensibility 0
16 respectable a manner as to engage the general good opinion of their Sense & Sensibility 0
数据框的大小相当小(大约 70k
行和 14k
个唯一单词)。
现在,在我的集群上训练 naive bayes
模型只需要几秒钟。
首先,我定义 pipeline
pipeline <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_naive_bayes( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
model_type = "multinomial",
smoothing = 0,
thresholds = c(1, 1))
然后训练 naive bayes
模型
> library(microbenchmark)
> microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)
Unit: seconds
expr min lq mean median uq max neval
model <- ml_fit(pipeline, mytext_spark) 6.718354 6.996424 7.647227 7.274494 8.111663 8.948832 3
现在的问题是尝试 运行 任何基于 tree
的模型(random forest
、boosted trees
等)(实际上很小!!)数据集将不起作用。
pipeline2 <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_gbt_classifier( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
max_memory_in_mb = 10240,
cache_node_ids = TRUE)
model2 <- ml_fit(pipeline2, mytext_spark)
# wont work :(
Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 69.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 69.0 (TID 1580, 1.1.1.1.1, executor 5):
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
我认为这是由于令牌的矩阵表示的稀疏性,但是这里有什么可以做的吗?这是 sparklyr
问题吗? spark
问题?我的代码效率低下吗?
谢谢!
能否提供完整的错误回溯?
我猜你 运行 内存不足。随机森林和 gbt 树是集成模型,因此它们需要比朴素贝叶斯更多的内存和计算能力。
尝试对数据重新分区(spark.sparkContext.defaultParallelism 值是一个很好的起点),以便您的每个工作人员获得更小且分布更均匀的块。
如果这不起作用,请尝试将 max_memory_in_mb
参数减小为 256
。
您收到此错误是因为您实际上达到了 Spark 中著名的 2G 限制 https://issues.apache.org/jira/browse/SPARK-6235
解决方案是在将数据提供给算法之前对其进行重新分区。
这实际上是 post 中的两个陷阱:
- 正在处理本地数据。
- Spark 中基于树的模型非常耗费内存。
所以,让我们检查一下你的代码,它看起来无害;
library(janeaustenr) # to get some text data
library(stringr)
mytext <- austen_books() %>%
mutate(label = as.integer(str_detect(text, 'great'))) # create a fake label variable
mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)
那么最后一行是做什么的?
copy_to
(不是为大数据集设计的),实际上只是将本地R数据帧复制到一个1分区的Spark DataFrame
因此,您只需要对数据进行重新分区,以确保管道在将数据输入 gbt
之前准备好数据后,分区大小小于 2GB。
因此您只需执行以下操作即可对数据进行重新分区:
# 20 is an arbitrary number I chose to test and it seems to work well in this case,
# you might want to reconsider that if you have a bigger dataset.
mytext_spark <-
copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>%
sdf_repartition(partitions = 20)
PS1: max_memory_in_mb
是您为 gbt
计算其统计数据所分配的内存量。它与输入的数据量没有直接关系。
PS2: 如果你没有为你的执行者设置足够的内存,你可能 运行 进入 java.lang.OutOfMemoryError : GC overhead limit exceeded
编辑:重新分区数据的含义是什么?
在谈论重新分区之前,我们总是可以参考分区的定义。我会尽量简短。
A partition is a logical chunk of a large distributed data set.
Spark manages data using partitions that helps parallelize distributed data processing with minimal network traffic for sending data between executors.
By default, Spark tries to read data into an RDD from the nodes that are close to it. Since Spark usually accesses distributed partitioned data, to optimize transformation operations it creates partitions to hold the data chunks.
Increasing partitions count will make each partition to have less data (or not at all!)
source: excerpt from @JacekLaskowski Mastering Apache Spark book.
但数据分区并不总是正确的,就像在本例中一样。所以需要重新分区。 (sdf_repartition
对于 sparklyr
)
sdf_repartition
将在您的节点上分散和随机播放您的数据。即 sdf_repartition(20)
将为您的数据创建 20 个分区,而不是在这种情况下您最初拥有的 1 个分区。
希望对您有所帮助。
整个代码:
library(sparklyr)
library(dplyr)
config <- spark_config()
config$`sparklyr.shell.driver-memory` <- "4G"
config$`sparklyr.shell.executor-memory` <- "4G"
Sys.setenv(SPARK_HOME = "/Users/eliasah/server/spark-2.3.1-SNAPSHOT-bin-2.7.3")
sc <- spark_connect(master = "local", config = config)
library(janeaustenr) # to get some text data
library(stringr)
mytext <- austen_books() %>%
mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable
mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>% sdf_repartition(partitions = 20)
pipeline <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_naive_bayes( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
model_type = "multinomial",
smoothing = 0,
thresholds = c(1, 1))
library(microbenchmark)
microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)
pipeline2 <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_gbt_classifier( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
max_memory_in_mb = 10240, # this is amount of data that can be use for
cache_node_ids = TRUE)
model2 <- ml_fit(pipeline2, mytext_spark)
pipeline3 <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken')
model2
# PipelineModel (Transformer) with 3 stages
# <pipeline_1ce45bb8b7a7>
# Stages
# |--1 RegexTokenizer (Transformer)
# | <regex_tokenizer_1ce4342b543b>
# | (Parameters -- Column Names)
# | input_col: text
# | output_col: mytoken
# |--2 CountVectorizerModel (Transformer)
# | <count_vectorizer_1ce4e0e6489>
# | (Parameters -- Column Names)
# | input_col: mytoken
# | output_col: finaltoken
# | (Transformer Info)
# | vocabulary: <list>
# |--3 GBTClassificationModel (Transformer)
# | <gbt_classifier_1ce41ab30213>
# | (Parameters -- Column Names)
# | features_col: finaltoken
# | label_col: label
# | prediction_col: pcol
# | probability_col: prcol
# | raw_prediction_col: rpcol
# | (Transformer Info)
# | feature_importances: num [1:39158] 6.73e-04 7.20e-04 1.01e-15 1.97e-03 0.00 ...
# | num_classes: int 2
# | num_features: int 39158
# | total_num_nodes: int 540
# | tree_weights: num [1:20] 1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 ...
# | trees: <list>
考虑这个使用 sparklyr
:
library(sparklyr)
library(janeaustenr) # to get some text data
library(stringr)
library(dplyr)
mytext <- austen_books() %>%
mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable
mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)
# Source: table<mytext_spark> [?? x 3]
# Database: spark_connection
text book label
<chr> <chr> <int>
1 SENSE AND SENSIBILITY Sense & Sensibility 0
2 "" Sense & Sensibility 0
3 by Jane Austen Sense & Sensibility 0
4 "" Sense & Sensibility 0
5 (1811) Sense & Sensibility 0
6 "" Sense & Sensibility 0
7 "" Sense & Sensibility 0
8 "" Sense & Sensibility 0
9 "" Sense & Sensibility 0
10 CHAPTER 1 Sense & Sensibility 0
11 "" Sense & Sensibility 0
12 "" Sense & Sensibility 0
13 The family of Dashwood had long been settled in Sussex. Their estate Sense & Sensibility 0
14 was large, and their residence was at Norland Park, in the centre of Sense & Sensibility 0
15 their property, where, for many generations, they had lived in so Sense & Sensibility 0
16 respectable a manner as to engage the general good opinion of their Sense & Sensibility 0
数据框的大小相当小(大约 70k
行和 14k
个唯一单词)。
现在,在我的集群上训练 naive bayes
模型只需要几秒钟。
首先,我定义 pipeline
pipeline <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_naive_bayes( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
model_type = "multinomial",
smoothing = 0,
thresholds = c(1, 1))
然后训练 naive bayes
模型
> library(microbenchmark)
> microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)
Unit: seconds
expr min lq mean median uq max neval
model <- ml_fit(pipeline, mytext_spark) 6.718354 6.996424 7.647227 7.274494 8.111663 8.948832 3
现在的问题是尝试 运行 任何基于 tree
的模型(random forest
、boosted trees
等)(实际上很小!!)数据集将不起作用。
pipeline2 <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_gbt_classifier( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
max_memory_in_mb = 10240,
cache_node_ids = TRUE)
model2 <- ml_fit(pipeline2, mytext_spark)
# wont work :(
Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 69.0 failed 4 times, most recent failure: Lost task 0.3 in stage 69.0 (TID 1580, 1.1.1.1.1, executor 5): java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
我认为这是由于令牌的矩阵表示的稀疏性,但是这里有什么可以做的吗?这是 sparklyr
问题吗? spark
问题?我的代码效率低下吗?
谢谢!
能否提供完整的错误回溯?
我猜你 运行 内存不足。随机森林和 gbt 树是集成模型,因此它们需要比朴素贝叶斯更多的内存和计算能力。
尝试对数据重新分区(spark.sparkContext.defaultParallelism 值是一个很好的起点),以便您的每个工作人员获得更小且分布更均匀的块。
如果这不起作用,请尝试将 max_memory_in_mb
参数减小为 256
。
您收到此错误是因为您实际上达到了 Spark 中著名的 2G 限制 https://issues.apache.org/jira/browse/SPARK-6235
解决方案是在将数据提供给算法之前对其进行重新分区。
这实际上是 post 中的两个陷阱:
- 正在处理本地数据。
- Spark 中基于树的模型非常耗费内存。
所以,让我们检查一下你的代码,它看起来无害;
library(janeaustenr) # to get some text data
library(stringr)
mytext <- austen_books() %>%
mutate(label = as.integer(str_detect(text, 'great'))) # create a fake label variable
mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)
那么最后一行是做什么的?
copy_to
(不是为大数据集设计的),实际上只是将本地R数据帧复制到一个1分区的Spark DataFrame
因此,您只需要对数据进行重新分区,以确保管道在将数据输入 gbt
之前准备好数据后,分区大小小于 2GB。
因此您只需执行以下操作即可对数据进行重新分区:
# 20 is an arbitrary number I chose to test and it seems to work well in this case,
# you might want to reconsider that if you have a bigger dataset.
mytext_spark <-
copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>%
sdf_repartition(partitions = 20)
PS1: max_memory_in_mb
是您为 gbt
计算其统计数据所分配的内存量。它与输入的数据量没有直接关系。
PS2: 如果你没有为你的执行者设置足够的内存,你可能 运行 进入 java.lang.OutOfMemoryError : GC overhead limit exceeded
编辑:重新分区数据的含义是什么?
在谈论重新分区之前,我们总是可以参考分区的定义。我会尽量简短。
A partition is a logical chunk of a large distributed data set.
Spark manages data using partitions that helps parallelize distributed data processing with minimal network traffic for sending data between executors. By default, Spark tries to read data into an RDD from the nodes that are close to it. Since Spark usually accesses distributed partitioned data, to optimize transformation operations it creates partitions to hold the data chunks.
Increasing partitions count will make each partition to have less data (or not at all!)
source: excerpt from @JacekLaskowski Mastering Apache Spark book.
但数据分区并不总是正确的,就像在本例中一样。所以需要重新分区。 (sdf_repartition
对于 sparklyr
)
sdf_repartition
将在您的节点上分散和随机播放您的数据。即 sdf_repartition(20)
将为您的数据创建 20 个分区,而不是在这种情况下您最初拥有的 1 个分区。
希望对您有所帮助。
整个代码:
library(sparklyr)
library(dplyr)
config <- spark_config()
config$`sparklyr.shell.driver-memory` <- "4G"
config$`sparklyr.shell.executor-memory` <- "4G"
Sys.setenv(SPARK_HOME = "/Users/eliasah/server/spark-2.3.1-SNAPSHOT-bin-2.7.3")
sc <- spark_connect(master = "local", config = config)
library(janeaustenr) # to get some text data
library(stringr)
mytext <- austen_books() %>%
mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable
mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>% sdf_repartition(partitions = 20)
pipeline <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_naive_bayes( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
model_type = "multinomial",
smoothing = 0,
thresholds = c(1, 1))
library(microbenchmark)
microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)
pipeline2 <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_gbt_classifier( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
max_memory_in_mb = 10240, # this is amount of data that can be use for
cache_node_ids = TRUE)
model2 <- ml_fit(pipeline2, mytext_spark)
pipeline3 <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken')
model2
# PipelineModel (Transformer) with 3 stages
# <pipeline_1ce45bb8b7a7>
# Stages
# |--1 RegexTokenizer (Transformer)
# | <regex_tokenizer_1ce4342b543b>
# | (Parameters -- Column Names)
# | input_col: text
# | output_col: mytoken
# |--2 CountVectorizerModel (Transformer)
# | <count_vectorizer_1ce4e0e6489>
# | (Parameters -- Column Names)
# | input_col: mytoken
# | output_col: finaltoken
# | (Transformer Info)
# | vocabulary: <list>
# |--3 GBTClassificationModel (Transformer)
# | <gbt_classifier_1ce41ab30213>
# | (Parameters -- Column Names)
# | features_col: finaltoken
# | label_col: label
# | prediction_col: pcol
# | probability_col: prcol
# | raw_prediction_col: rpcol
# | (Transformer Info)
# | feature_importances: num [1:39158] 6.73e-04 7.20e-04 1.01e-15 1.97e-03 0.00 ...
# | num_classes: int 2
# | num_features: int 39158
# | total_num_nodes: int 540
# | tree_weights: num [1:20] 1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 ...
# | trees: <list>