Dataproc 上的处理速度极慢 9 小时,而本地计算机上的处理时间为 3 分钟

Extremely Slow Processing on Dataproc 9 hours vs 3 mins on local machine

从日志中我可以看到有 182k 行 70MB。在 Dataproc 上训练 182K 行需要 1.5 小时加载 70MB 数据和 9 小时(从 15/11/14 01:58:28 开始到 15/11/14 09:19:09 结束)。在我的本地计算机上加载相同的数据和 运行 相同的算法需要 3 分钟

DataProc 日志

15/11/13 23:27:09 INFO com.google.cloud.hadoop.io.bigquery.ShardedExportToCloudStorage: Table 'mydata-data:website_wtw_feed.video_click20151111' to be exported has 182712 rows and 70281790 bytes
15/11/13 23:28:13 WARN akka.remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@rc-spark-poc-w-1.c.dailymotion-data.internal:60749] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 

15/11/14 01:58:28 INFO com.dailymotion.recommender.BigQueryRecommender: Fetching the Ratings RDD
15/11/14 01:58:28 INFO com.dailymotion.recommender.BigQueryRecommender: Transforming the video feature matrix
15/11/14 01:58:28 INFO com.dailymotion.recommender.BigQueryRecommender: Training ALS Matrix factorization Model


[Stage 2:=============================>                             (1 + 1) / 2]

15/11/14 09:19:09 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
15/11/14 09:19:09 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS

15/11/14 09:19:44 INFO com.dailymotion.recommender.BigQueryRecommender: Transforming the video feature matrix
15/11/14 09:19:44 INFO com.dailymotion.recommender.BigQueryRecommender: Transforming the user feature matrix
  1. 已将数据复制到本地机器

    r.viswanadha$ gsutil cp -r gs://<mycompany>-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000 .
    
    
    Copying gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/data-000000000000.json... 
    
    Downloading ...201511132327_0000/shard-0/data-000000000000.json: 141.3 MiB/141.3 MiB      
    
    Copying gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/data-000000000001.json... 
    
    Copying gs://<mycompany>-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-1/data-000000000000.json...`
    
  2. 运行 相同的算法。 ALS 训练步骤用了大约 3 分钟

    com.dailymotion.recommender.BigQueryRecommender --app_name BigQueryRecommenderTest --master local[4] --input_dir /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/input/job_201511132327_0000/shard-0/ 
    

第一个运行

15/11/14 13:19:36 INFO BigQueryRecommender: Training implicit features for the ALS Matrix factorization Model
...
15/11/14 13:22:24 INFO BigQueryRecommender: Transforming the video feature matrix

第二个运行

15/11/14 13:29:05 INFO BigQueryRecommender: Training implicit features for the ALS Matrix factorization Model


...

15/11/14 13:31:57 INFO BigQueryRecommender: Transforming the video feature matrix

DataProc 集群有 1 个 Master 和 3 个 Slave,每个 104GB (RAM) 和 16 CPUs。

我的本地机器有 8GB (RAM) 和 2 CPU 2.7GHz Core i5

gsutil ls -l -r -h  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000

gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/: 

gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/: 

    0 B  2015-11-13T23:27:13Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/ 

    141.3 MiB  2015-11-13T23:29:21Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/data-000000000000.json 

   0 B  2015-11-13T23:29:21Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-0/data-000000000001.json 

gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-1/: 

    0 B  2015-11-13T23:27:13Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-1/ 

    0 B  2015-11-13T23:28:47Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/shard-1/data-000000000000.json 

   0 B  2015-11-13T23:27:09Z  gs://dailymotion-spark-rc-test/bqdata/hadoop/tmp/bigquery/job_201511132327_0000/ 

TOTAL: 6 objects, 148165416 bytes (141.3 MiB)

对于遇到类似问题的任何人:当只处理 GCS 中的单个小对象(或具有来自 BigQuery 连接器的数据的单个分片)时,您最终可能会在 Spark RDD 中得到一个分区,并且作为结果,几乎没有或没有并行性。

虽然这会导致额外的洗牌阶段,但输入 RDD 可以在从 GCS 或 BigQuery 读取后立即重新分区以获得所需的分区数。额外的 shuffle 是否有益取决于 RDD 中的每条记录需要多少处理或 IO。

回顾一些离线发现,当分布式集群上的事情比本地设置慢 运行 个数量级时,要寻找的主要瓶颈是 I/O 往返- 跨网络服务依赖性以及磁盘和本地方面的延迟瓶颈 I/O.

一般要寻找的东西(其中一些可能适用于也可能不适用于您的特定案例,但可能对遇到类似问题的其他人来说很常见):

  1. 确保保存数据的 GCS 存储桶与您使用 gsutil ls -L gs://[your-bucket] 部署 Dataproc 集群的 GCE 区域位于同一区域。跨大陆的流量不仅明显变慢,而且可能会在您的项目中产生额外的网络成本。
  2. 如果您的作业有任何其他网络依赖性,例如在 GCE 上查询 API 或某种单独的数据库 运行,请尝试将它们放在同一区域;即使在同一个大陆,GCE 跨区域流量也可能有数十毫秒的往返延迟,这可能会显着增加,特别是如果有每条记录的请求(例如,30 毫秒 * 180k 条记录得出 1.5 小时).
  3. 即使这一次可能不适用于您的特定情况,请记住尽可能避免通过 Hadoop 文件系统接口将每条记录往返 I/O 到 GCS; GCS 的总体 吞吐量 非常可扩展,但由于远程存储的性质,往返延迟比您可能在本地计算机上测量的往返延迟慢得多,因为本地读取经常命中 OS 缓冲区缓存,或者如果您使用的笔记本电脑配备 SSD,则与 GCS 的 30 毫秒至 100 毫秒往返相比,能够维持大量亚毫秒往返。

一般来说,对于可以支持非常高的吞吐量但遭受长往返延迟的用例,如果数据很小且不自然地划分为足够的并行度,以确保充分利用您的 Spark 集群。

最后,我们最新的 Dataproc release 修复了一堆本机库配置,因此它可能会为 ALS 部分以及其他 mllib 用例显示更好的性能。