如何在 Spark 中训练具有稀疏矩阵的随机森林?

How can I train a random forest with a sparse matrix in Spark?

考虑这个使用 sparklyr:

的简单示例
library(sparklyr)
library(janeaustenr) # to get some text data
library(stringr)
library(dplyr)

mytext <- austen_books() %>% 
  mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable

mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)

# Source:   table<mytext_spark> [?? x 3]
# Database: spark_connection
   text                                                                    book                label
   <chr>                                                                   <chr>               <int>
 1 SENSE AND SENSIBILITY                                                   Sense & Sensibility     0
 2 ""                                                                      Sense & Sensibility     0
 3 by Jane Austen                                                          Sense & Sensibility     0
 4 ""                                                                      Sense & Sensibility     0
 5 (1811)                                                                  Sense & Sensibility     0
 6 ""                                                                      Sense & Sensibility     0
 7 ""                                                                      Sense & Sensibility     0
 8 ""                                                                      Sense & Sensibility     0
 9 ""                                                                      Sense & Sensibility     0
10 CHAPTER 1                                                               Sense & Sensibility     0
11 ""                                                                      Sense & Sensibility     0
12 ""                                                                      Sense & Sensibility     0
13 The family of Dashwood had long been settled in Sussex.  Their estate   Sense & Sensibility     0
14 was large, and their residence was at Norland Park, in the centre of    Sense & Sensibility     0
15 their property, where, for many generations, they had lived in so       Sense & Sensibility     0
16 respectable a manner as to engage the general good opinion of their     Sense & Sensibility     0

数据框的大小相当小(大约 70k 行和 14k 个唯一单词)。

现在,在我的集群上训练 naive bayes 模型只需要几秒钟。 首先,我定义 pipeline

pipeline <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>% 
  ml_naive_bayes( label_col = "label", 
                  features_col = "finaltoken", 
                  prediction_col = "pcol",
                  probability_col = "prcol", 
                  raw_prediction_col = "rpcol",
                  model_type = "multinomial", 
                  smoothing = 0, 
                  thresholds = c(1, 1))

然后训练 naive bayes 模型

> library(microbenchmark)
> microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)
Unit: seconds
                                    expr      min       lq     mean   median       uq      max neval
 model <- ml_fit(pipeline, mytext_spark) 6.718354 6.996424 7.647227 7.274494 8.111663 8.948832     3

现在的问题是尝试 运行 任何基于 tree 的模型(random forestboosted trees 等)(实际上很小!!)数据集将不起作用。

pipeline2 <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>% 
  ml_gbt_classifier( label_col = "label", 
                     features_col = "finaltoken", 
                     prediction_col = "pcol",
                     probability_col = "prcol", 
                     raw_prediction_col = "rpcol",
                     max_memory_in_mb = 10240,
                     cache_node_ids = TRUE)

model2 <- ml_fit(pipeline2, mytext_spark)
# wont work :(

Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 69.0 failed 4 times, most recent failure: Lost task 0.3 in stage 69.0 (TID 1580, 1.1.1.1.1, executor 5): java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

我认为这是由于令牌的矩阵表示的稀疏性,但是这里有什么可以做的吗?这是 sparklyr 问题吗? spark 问题?我的代码效率低下吗?

谢谢!

能否提供完整的错误回溯?

我猜你 运行 内存不足。随机森林和 gbt 树是集成模型,因此它们需要比朴素贝叶斯更多的内存和计算能力。

尝试对数据重新分区(spark.sparkContext.defaultParallelism 值是一个很好的起点),以便您的每个工作人员获得更小且分布更均匀的块。

如果这不起作用,请尝试将 max_memory_in_mb 参数减小为 256

您收到此错误是因为您实际上达到了 Spark 中著名的 2G 限制 https://issues.apache.org/jira/browse/SPARK-6235

解决方案是在将数据提供给算法之前对其进行重新分区。

这实际上是 post 中的两个陷阱:

  • 正在处理本地数据。
  • Spark 中基于树的模型非常耗费内存。

所以,让我们检查一下你的代码,它看起来无害;

 library(janeaustenr) # to get some text data
 library(stringr)

 mytext <- austen_books() %>% 
    mutate(label = as.integer(str_detect(text, 'great'))) # create a fake label variable

 mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)

那么最后一行是做什么的?

copy_to(不是为大数据集设计的),实际上只是将本地R数据帧复制到一个1分区的Spark DataFrame

因此,您只需要对数据进行重新分区,以确保管道在将数据输入 gbt 之前准备好数据后,分区大小小于 2GB。

因此您只需执行以下操作即可对数据进行重新分区:

# 20 is an arbitrary number I chose to test and it seems to work well in this case, 
# you might want to reconsider that if you have a bigger dataset.
mytext_spark <- 
 copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>% 
 sdf_repartition(partitions = 20)

PS1: max_memory_in_mb 是您为 gbt 计算其统计数据所分配的内存量。它与输入的数据量没有直接关系。

PS2: 如果你没有为你的执行者设置足够的内存,你可能 运行 进入 java.lang.OutOfMemoryError : GC overhead limit exceeded

编辑:重新分区数据的含义是什么?

在谈论重新分区之前,我们总是可以参考分区的定义。我会尽量简短。

A partition is a logical chunk of a large distributed data set.

Spark manages data using partitions that helps parallelize distributed data processing with minimal network traffic for sending data between executors. By default, Spark tries to read data into an RDD from the nodes that are close to it. Since Spark usually accesses distributed partitioned data, to optimize transformation operations it creates partitions to hold the data chunks.

Increasing partitions count will make each partition to have less data (or not at all!)

source: excerpt from @JacekLaskowski Mastering Apache Spark book.

但数据分区并不总是正确的,就像在本例中一样。所以需要重新分区。 (sdf_repartition 对于 sparklyr

sdf_repartition 将在您的节点上分散和随机播放您的数据。即 sdf_repartition(20) 将为您的数据创建 20 个分区,而不是在这种情况下您最初拥有的 1 个分区。

希望对您有所帮助。

整个代码:

library(sparklyr)
library(dplyr)
config <- spark_config()
config$`sparklyr.shell.driver-memory` <- "4G"
config$`sparklyr.shell.executor-memory` <- "4G"
Sys.setenv(SPARK_HOME = "/Users/eliasah/server/spark-2.3.1-SNAPSHOT-bin-2.7.3")
sc <- spark_connect(master = "local", config = config)

library(janeaustenr) # to get some text data
library(stringr)

mytext <- austen_books() %>% 
  mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable

mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>% sdf_repartition(partitions = 20)

pipeline <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>% 
  ml_naive_bayes( label_col = "label", 
                  features_col = "finaltoken", 
                  prediction_col = "pcol",
                  probability_col = "prcol", 
                  raw_prediction_col = "rpcol",
                  model_type = "multinomial", 
                  smoothing = 0, 
                  thresholds = c(1, 1))

library(microbenchmark)
microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)

pipeline2 <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>% 
  ml_gbt_classifier( label_col = "label", 
                     features_col = "finaltoken", 
                     prediction_col = "pcol",
                     probability_col = "prcol", 
                     raw_prediction_col = "rpcol",
                     max_memory_in_mb = 10240, # this is amount of data that can be use for 
                     cache_node_ids = TRUE)

model2 <- ml_fit(pipeline2, mytext_spark)

pipeline3 <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken', 
                     pattern = "\s+", 
                     gaps =TRUE) %>% 
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') 

model2
# PipelineModel (Transformer) with 3 stages
# <pipeline_1ce45bb8b7a7> 
#   Stages 
# |--1 RegexTokenizer (Transformer)
# |    <regex_tokenizer_1ce4342b543b> 
# |     (Parameters -- Column Names)
# |      input_col: text
# |      output_col: mytoken
# |--2 CountVectorizerModel (Transformer)
# |    <count_vectorizer_1ce4e0e6489> 
# |     (Parameters -- Column Names)
# |      input_col: mytoken
# |      output_col: finaltoken
# |     (Transformer Info)
# |      vocabulary: <list> 
# |--3 GBTClassificationModel (Transformer)
# |    <gbt_classifier_1ce41ab30213> 
# |     (Parameters -- Column Names)
# |      features_col: finaltoken
# |      label_col: label
# |      prediction_col: pcol
# |      probability_col: prcol
# |      raw_prediction_col: rpcol
# |     (Transformer Info)
# |      feature_importances:  num [1:39158] 6.73e-04 7.20e-04 1.01e-15 1.97e-03 0.00 ... 
# |      num_classes:  int 2 
# |      num_features:  int 39158 
# |      total_num_nodes:  int 540 
# |      tree_weights:  num [1:20] 1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 ... 
# |      trees: <list>