Spark/Hadoop工作笔记运行并行

Spark/Hadoop job not running in parralel

我想我遇到了初学者错误,但我真的不知道如何修复它,这让我抓狂。 我有一个由 2 台机器组成的集群:

  1. 8GB RAM(6.9 可用),4 核,Win10 : 运行 是主人,工人,也是我 运行 安装 java 驱动程序的机器程序(来自 IntelliJ)
  2. 2GB RAM(可用 1.3),4 核,Ubuntu VM 上 16.04(运行在 VBox 中):运行是一个工人

我有一个 class 网络,我想通过从具有单个网络的列表开始,然后使用平面图将每个网络转换为 N 个新网络,在 for 循环中生成网络。之后我有一个过滤器和一个计数。步骤:

    JavaSparkContext sc = new JavaSparkContext(conf);
    List<Network> data = Arrays.asList(new Network());
    JavaRDD<Network> currentN = sc.parallelize(data);
    for(int k=1;k<=10;k++) {
        JavaRDD<Network> newN = currentN.flatMap(new MyFlatMap());
        currentN = newN;
    }
    JavaRDD<Network> filteredNetworks = currentN.filter(new MyFilter());
    System.out.println(filteredNetworks.count());

算法有效并输出正确的值。

但是通过比较不同场景下应用的时长,我倾向于认为应用不是运行并行的:

使用两台机器的集群,总共有 2GB 内存和 8 个内核:持续时间 1.5 分钟

仅使用第二台机器(VM Ubuntu)、1GB RAM、4 核的集群:持续时间 1,1 分钟

仅使用第一台机器(master、worker 和驱动程序)、1GB RAM、4 核的集群:持续时间 3,2 分钟

我的火花截图UI:

活着的工人

第一个 运行 的事件时间表(两台机器):

按执行器聚合指标(两台机器)

我不知道为什么运行s Ubuntu在虚拟机中的机器比主机(master,worker和驱动程序)快,因为主机有更好的 CPU(i7 2.6GHZ 与 i3 1,9GHZ 相比)。

但主要问题是 为什么单台机器上的 运行ning 比两台机器上的 运行ning 都快? 不应该是另一台吗绕开?我的猜测是 RDD 不是并行计算的。如果是这种情况,您能否解释一下为什么以及如何让它并行处理?

工作内容说明:

基本上,这就是我想在for循环中实现的:

我从 1 Network 的 RDD 开始(它不是文件,它只是一个小 class)。

在 for 循环中,我使用 flatMap 将 1 个网络转换为 10 个新网络。

迭代 0:currentN = 1 个网络 -> flatMap -> currentN = 10 个网络

迭代 1:currentN = 10 个网络 -> flatMap -> currentN = 100 个网络

。 .

迭代 9:currentN = 10^8 个网络 -> flatMap -> currentN = 10^9 个网络

正如我所说,我正在 生成输入 。我想并行生成,所以这意味着 flatMap 需要并行完成。为此,spark 应该:

  1. 带N个网络的RDD

  2. 将RDD分成8个partitions for each core,每个partition有N/8 Networks

  3. 在每台机器上并行应用 flatMap,将每个 N/8 个网络转换为 N/8*10 个新网络。

  4. 在每台机器上重复这些步骤,使用 flatMap 并行生成。

  5. for循环结束后,每台机器应该有10^9 / 8个网络。并行过滤,然后统计每台机器上每个RDD的元素个数,并行输出结果。

这是我想要实现的,但由于某些原因,for 循环中的 flatMap 生成仅在一台机器上完成。

spark UI 的最后一个屏幕截图显示,在您的 8 个任务中,有 7 个以 37 毫秒的最长时间完成,而最长的任务是 运行ning 至少 46秒。

如果您的一个任务 运行 持续 3 分钟,而其他任务 运行 不到一秒,则您的分布式计算不平衡,因此您无法利用多台机器 运行ning 完全是因为计算时间受到最长任务的限制。

这种行为通常是由不平衡的operation/transformation(join,...)的不平衡大小的输入(1 Ko 的 7 个文件和 1 Go 的 1 个文件)引起的。

最后,如果不知道你的工作是什么,很难解释你的时间安排与 CPU,但一个可能的解释是你有一个数据密集型工作(而不是 CPU 密集型工作)因此瓶颈是硬盘驱动器(机器上的 SSD 最慢 CPU)。

我终于设法解决了这个问题。这个错误是如此愚蠢和明显,但我花了很长时间才弄清楚...

正如我之前提到的,我不是从文件中读取输入,而是从 1 Network 开始生成输入,然后在该网络上执行 flatMap 以获取N个网络,比我得到N*M个新网络等等。

但是因为我只从 1 个网络开始,当我这样做时

List<Network> data = Arrays.asList(new Network());
JavaRDD<Network> currentN = sc.parallelize(data);

在一项任务中,数据仅在一个 CPU 上并行化,因为 RDD 仅包含 1 个元素,因此存在问题。